Overview
The Dedalus SDK supports streaming responses for chat completions using Server-Sent Events (SSE). Streaming allows you to receive tokens as they’re generated, providing a better user experience for long-form content.Basic streaming
- Synchronous
- Asynchronous
from dedalus_labs import Dedalus
client = Dedalus(api_key="your-api-key")
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a short story"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
import asyncio
from dedalus_labs import AsyncDedalus
async def main():
client = AsyncDedalus(api_key="your-api-key")
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a short story"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(main())
Streaming with context manager
Use context managers to ensure proper resource cleanup:- Synchronous
- Asynchronous
from dedalus_labs import Dedalus
client = Dedalus(api_key="your-api-key")
with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "Say hello there!"}],
) as stream:
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# Get the final accumulated completion
completion = stream.get_final_completion()
print(f"\n\nFull response: {completion.choices[0].message.content}")
import asyncio
from dedalus_labs import AsyncDedalus
async def main():
client = AsyncDedalus(api_key="your-api-key")
async with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "Say hello there!"}],
) as stream:
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# Get the final accumulated completion
completion = await stream.get_final_completion()
print(f"\n\nFull response: {completion.choices[0].message.content}")
asyncio.run(main())
The context manager automatically closes the stream and underlying HTTP connection when exiting, even if an exception occurs.
Structured output streaming
Stream responses with Pydantic model parsing:from dedalus_labs import Dedalus
from pydantic import BaseModel
from typing_extensions import Literal
class Location(BaseModel):
city: str
temperature: float
units: Literal["c", "f"]
client = Dedalus(api_key="your-api-key")
with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "What's the weather in SF?"}],
response_format=Location,
) as stream:
for event in stream:
# Stream events include type information
print(f"Event type: {event.type}")
completion = stream.get_final_completion()
# Parsed Pydantic model
location = completion.choices[0].message.parsed
print(f"City: {location.city}")
print(f"Temperature: {location.temperature}°{location.units}")
Stream events
When using structured streaming, you receive typed events:from dedalus_labs import Dedalus
from dedalus_labs.lib.streaming.chat import (
ContentDoneEvent,
RefusalDoneEvent,
FunctionToolCallArgumentsDoneEvent,
)
from pydantic import BaseModel
class Location(BaseModel):
city: str
temperature: float
client = Dedalus(api_key="your-api-key")
with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "Weather in London?"}],
response_format=Location,
) as stream:
for event in stream:
if event.type == "content.delta":
# Incremental content update
print(event.delta, end="")
elif event.type == "content.done":
# Content complete with parsed model
assert isinstance(event, ContentDoneEvent)
if event.parsed:
print(f"\nParsed: {event.parsed}")
elif event.type == "refusal.done":
# Model refused the request
assert isinstance(event, RefusalDoneEvent)
print(f"Refusal: {event.refusal}")
Available event types
content.delta- Incremental content updatecontent.done- Content streaming completerefusal.delta- Incremental refusal messagerefusal.done- Refusal completetool_calls.function.arguments.delta- Tool arguments updatetool_calls.function.arguments.done- Tool arguments complete
Tool call streaming
Stream tool calls with parsed arguments:from dedalus_labs import Dedalus
from dedalus_labs.lib._tools import pydantic_function_tool
from pydantic import BaseModel
from typing_extensions import Literal
class GetWeatherArgs(BaseModel):
"""Get the temperature for the given country/city combo"""
city: str
country: str
units: Literal["c", "f"] = "c"
client = Dedalus(api_key="your-api-key")
with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "What's the weather in Edinburgh?"}],
tools=[pydantic_function_tool(GetWeatherArgs)],
) as stream:
for event in stream:
if event.type == "tool_calls.function.arguments.done":
# Tool call complete with parsed arguments
print(f"Tool: {event.name}")
print(f"Parsed args: {event.parsed_arguments}")
completion = stream.get_final_completion()
tool_call = completion.choices[0].message.tool_calls[0]
# Access parsed Pydantic model
args = tool_call.function.parsed_arguments
print(f"City: {args.city}, Country: {args.country}")
Manual stream accumulation
For more control, manually accumulate stream chunks:from dedalus_labs import Dedalus
from dedalus_labs.lib.streaming.chat import ChatCompletionStreamState
client = Dedalus(api_key="your-api-key")
state = ChatCompletionStreamState()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Say hello there!"}],
stream=True,
)
for chunk in response:
state.handle_chunk(chunk)
# Access current state
current_content = state.current_content()
print(f"\rCurrent: {current_content}", end="")
# Get final completion
completion = state.get_final_completion()
print(f"\nFinal: {completion.choices[0].message.content}")
Error handling in streams
Length limit errors
When using structured outputs, the SDK raisesLengthFinishReasonError if the model hits token limits:
from dedalus_labs import Dedalus
from dedalus_labs._exceptions import LengthFinishReasonError
from pydantic import BaseModel
class Location(BaseModel):
city: str
temperature: float
client = Dedalus(api_key="your-api-key")
try:
with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "Weather?"}],
response_format=Location,
max_tokens=1, # Too low!
) as stream:
for _ in stream:
pass
except LengthFinishReasonError as e:
print(f"Hit token limit: {e.completion.usage}")
Connection errors
from dedalus_labs import Dedalus
from dedalus_labs import APIConnectionError, APITimeoutError
client = Dedalus(api_key="your-api-key", timeout=10.0)
try:
with client.chat.completions.stream(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
) as stream:
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
except APITimeoutError:
print("Request timed out")
except APIConnectionError:
print("Connection failed")
Streaming best practices
Always use context managers - The
stream() method returns a context manager that ensures proper cleanup of network connections.Flush output for real-time display - Use
flush=True in print() to see tokens as they arrive:print(chunk.choices[0].delta.content, end="", flush=True)
Don’t consume the stream multiple times. Once a stream is exhausted, you must create a new request to stream again.
Complete streaming example
from dedalus_labs import Dedalus
from dedalus_labs._exceptions import LengthFinishReasonError, APIError
from pydantic import BaseModel
from typing_extensions import Literal
class WeatherResponse(BaseModel):
location: str
temperature: float
units: Literal["celsius", "fahrenheit"]
conditions: str
def stream_weather(city: str):
client = Dedalus(api_key="your-api-key")
try:
with client.chat.completions.stream(
model="gpt-4",
messages=[
{"role": "user", "content": f"What's the weather in {city}?"}
],
response_format=WeatherResponse,
) as stream:
print("Streaming response...\n")
for event in stream:
if event.type == "content.delta":
print(event.delta, end="", flush=True)
print("\n\nGetting final result...")
completion = stream.get_final_completion()
if completion.choices[0].message.parsed:
weather = completion.choices[0].message.parsed
print(f"\nLocation: {weather.location}")
print(f"Temperature: {weather.temperature}°{weather.units}")
print(f"Conditions: {weather.conditions}")
return weather
else:
print("No structured output parsed")
return None
except LengthFinishReasonError as e:
print(f"Response truncated: {e}")
return None
except APIError as e:
print(f"API error: {e.message}")
return None
if __name__ == "__main__":
stream_weather("San Francisco")