Skip to main content

Streaming

Streaming allows you to receive responses token by token as they’re generated, enabling real-time output in your applications.

Basic Streaming

from lunar import Lunar

client = Lunar()

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Write a short story about a robot."}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Stream Response Structure

Each chunk is a ChatCompletionChunk object:
FieldTypeDescription
idstrCompletion identifier
objectstr"chat.completion.chunk"
createdintUnix timestamp
modelstrModel used
choiceslistList of streaming choices

Choice Delta

chunk.choices[0].index          # Position in choices list
chunk.choices[0].delta.role     # Role (only in first chunk)
chunk.choices[0].delta.content  # Content fragment
chunk.choices[0].finish_reason  # None until final chunk, then "stop"

Collecting Full Response

full_response = ""

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        full_response += content
        print(content, end="", flush=True)

print()  # New line
print(f"Complete: {full_response}")

Async Streaming

from lunar import AsyncLunar

async def stream_response():
    async with AsyncLunar() as client:
        stream = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "Tell me a joke."}],
            stream=True
        )

        async for chunk in stream:
            if chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)

Detecting Stream End

for chunk in stream:
    # Check for final chunk
    if chunk.choices[0].finish_reason:
        print(f"\nStream ended: {chunk.choices[0].finish_reason}")
        break

    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

With Fallbacks

Streaming works with fallbacks. If the primary model fails, the SDK automatically tries the next model:
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
    fallbacks=["claude-3-haiku", "llama-3.1-8b"]
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Streaming with Error Handling

from lunar import Lunar, RateLimitError, ServerError

client = Lunar()

try:
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True
    )

    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)

except RateLimitError as e:
    print(f"Rate limited. Retry after: {e.retry_after}s")
except ServerError as e:
    print(f"Server error: {e}")

When to Use Streaming

Use CaseStream?
Chat interfacesYes
Real-time outputYes
Long-form generationYes
Batch processingNo
API integrationsDepends

Performance Considerations

  • TTFT (Time to First Token): Streaming provides faster perceived response time
  • Total latency: Similar to non-streaming
  • Memory: Streaming uses less memory for long responses
# Non-streaming: Wait for full response
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Write a long essay."}]
)
# User waits 10 seconds, then sees everything

# Streaming: See output immediately
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Write a long essay."}],
    stream=True
)
# User sees first token in ~300ms, continues for 10 seconds