Skip to main content

Overview

Streaming allows you to receive chat completion responses incrementally as they are generated, rather than waiting for the complete response. This is useful for providing real-time feedback to users and reducing perceived latency.

Enable streaming

To enable streaming, set stream=True when calling client.chat.completions.create():
from dedalus_labs import Dedalus

client = Dedalus()

stream = client.chat.completions.create(
    model="openai/gpt-5-nano",
    stream=True,
    messages=[
        {
            "role": "system",
            "content": "You are Stephen Dedalus. Respond in morose Joycean malaise."
        },
        {
            "role": "user",
            "content": "What do you think of artificial intelligence?"
        }
    ]
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Response structure

When streaming is enabled, the API returns a Stream[ChatCompletionChunk] instead of a ChatCompletion object.

ChatCompletionChunk fields

id
string
required
A unique identifier for the chat completion. Each chunk has the same ID.
object
string
required
The object type, which is always chat.completion.chunk.
created
int
required
The Unix timestamp (in seconds) of when the chat completion was created. Each chunk has the same timestamp.
model
string
required
The model used to generate the completion.
choices
array
required
A list of chat completion choices. Can contain more than one element if n is greater than 1. Can also be empty for the last chunk if you set stream_options: {"include_usage": true}.Each choice contains:
  • index: The index of this choice
  • delta: The incremental message content with role and/or content
  • finish_reason: “stop” | “length” | “tool_calls” | “content_filter” | null
  • logprobs: Log probability information (if requested)
usage
object
Usage statistics for the completion request. Only present in the final chunk when stream_options: {"include_usage": true} is set.Fields:
  • prompt_tokens: Number of tokens in the prompt
  • completion_tokens: Number of tokens in the completion
  • total_tokens: Total tokens used
  • completion_tokens_details: Breakdown of completion tokens
  • prompt_tokens_details: Breakdown of prompt tokens
system_fingerprint
string
This fingerprint represents the backend configuration that the model runs with. Can be used in conjunction with the seed request parameter to understand when backend changes have been made that might impact determinism.
service_tier
string
The processing type used for serving the request.

Stream options

You can configure streaming behavior using the stream_options parameter:
stream = client.chat.completions.create(
    model="openai/gpt-5-nano",
    stream=True,
    stream_options={"include_usage": True},
    messages=[
        {"role": "user", "content": "Hello!"}
    ]
)
stream_options.include_usage
boolean
When set to true, the final chunk will include usage statistics in the usage field.

Async streaming

The async client provides the same streaming interface using async for:
from dedalus_labs import AsyncDedalus

client = AsyncDedalus()

stream = await client.chat.completions.create(
    model="openai/gpt-5-nano",
    stream=True,
    messages=[
        {
            "role": "system",
            "content": "You are Stephen Dedalus. Respond in morose Joycean malaise."
        },
        {
            "role": "user",
            "content": "What do you think of artificial intelligence?"
        }
    ]
)

async for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Handling deltas

Each chunk contains a delta object instead of a complete message. The delta represents the incremental changes from the previous chunk:
for chunk in stream:
    delta = chunk.choices[0].delta
    
    # First chunk typically contains the role
    if delta.role:
        print(f"Role: {delta.role}")
    
    # Subsequent chunks contain content
    if delta.content:
        print(delta.content, end="")
    
    # Tool calls are also streamed incrementally
    if delta.tool_calls:
        for tool_call in delta.tool_calls:
            print(f"Tool call: {tool_call.function.name}")

Complete example with accumulation

Here’s how to accumulate the streamed chunks into a complete message:
from dedalus_labs import Dedalus

client = Dedalus()

stream = client.chat.completions.create(
    model="openai/gpt-5-nano",
    stream=True,
    messages=[
        {"role": "user", "content": "Tell me a short story"}
    ]
)

complete_message = ""

for chunk in stream:
    delta = chunk.choices[0].delta
    
    if delta.content:
        complete_message += delta.content
        print(delta.content, end="", flush=True)
    
    # Check if stream is done
    if chunk.choices[0].finish_reason:
        print(f"\n\nFinish reason: {chunk.choices[0].finish_reason}")

print(f"\n\nComplete message:\n{complete_message}")

Streaming with function calls

When using tools/functions, the tool calls are also streamed incrementally:
stream = client.chat.completions.create(
    model="openai/gpt-5",
    stream=True,
    messages=[
        {"role": "user", "content": "What's the weather in Boston?"}
    ],
    tools=[
        {
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "Get the current weather in a location",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": {"type": "string"}
                    },
                    "required": ["location"]
                }
            }
        }
    ]
)

for chunk in stream:
    delta = chunk.choices[0].delta
    
    if delta.tool_calls:
        for tool_call in delta.tool_calls:
            if tool_call.function.name:
                print(f"Calling: {tool_call.function.name}")
            if tool_call.function.arguments:
                print(f"Args: {tool_call.function.arguments}", end="")

Error handling

Handle errors during streaming using try-except:
import dedalus_labs
from dedalus_labs import Dedalus

client = Dedalus()

try:
    stream = client.chat.completions.create(
        model="openai/gpt-5-nano",
        stream=True,
        messages=[
            {"role": "user", "content": "Hello!"}
        ]
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
            
except dedalus_labs.APIConnectionError as e:
    print(f"Connection error: {e}")
except dedalus_labs.APIStatusError as e:
    print(f"API error {e.status_code}: {e.response}")

Parameters

All parameters from the completions endpoint are supported when streaming. The only difference is setting stream=True.
stream
boolean
required
Set to true to enable streaming responses via Server-Sent Events.
For all other parameters, see the completions documentation.

Best practices

  1. Flush output: Use flush=True with print() or flush output streams regularly to ensure content appears immediately
  2. Handle partial JSON: When streaming tool calls, arguments may arrive in multiple chunks - accumulate them before parsing
  3. Check finish_reason: Always check the finish_reason to understand why the stream ended
  4. Include usage stats: Set stream_options: {"include_usage": true} if you need token usage information
  5. Error handling: Wrap streaming code in try-except blocks to handle network errors gracefully

Build docs developers (and LLMs) love