Documentation Index Fetch the complete documentation index at: https://mintlify.com/langchain-ai/langchain/llms.txt
Use this file to discover all available pages before exploring further.
Streaming enables real-time display of LLM outputs as they’re generated, creating responsive user experiences. Instead of waiting for complete responses, users see text appear progressively.
Why Stream?
Better UX : Users see responses immediately, not after 10+ seconds
Lower latency : First token appears much faster
Transparency : Users can stop generation early if answer is sufficient
Long outputs : Handle long responses without timeout issues
Basic Streaming
All LangChain chat models support streaming via the stream() method:
from langchain_openai import ChatOpenAI
model = ChatOpenAI( model = "gpt-4" )
# Stream response chunks
for chunk in model.stream( "Write a short story about a robot" ):
print (chunk.content, end = "" , flush = True )
# Output appears progressively:
# Once
# Once upon
# Once upon a
# Once upon a time
# ...
Message Chunks
Streaming returns AIMessageChunk objects:
from langchain_openai import ChatOpenAI
model = ChatOpenAI( model = "gpt-4" )
for chunk in model.stream( "Tell me a joke" ):
print ( f "Chunk: { chunk.content !r} " )
print ( f "Type: { type (chunk) } " )
print ( f "ID: { chunk.id } \n " )
# Output:
# Chunk: 'Why'
# Type: <class 'langchain_core.messages.ai.AIMessageChunk'>
# ID: msg_abc123
#
# Chunk: ' did'
# Type: <class 'langchain_core.messages.ai.AIMessageChunk'>
# ID: msg_abc123
Async Streaming
Use async streaming for concurrent operations:
import asyncio
from langchain_openai import ChatOpenAI
model = ChatOpenAI( model = "gpt-4" )
async def stream_response ():
async for chunk in model.astream( "Explain quantum computing" ):
print (chunk.content, end = "" , flush = True )
# Run async function
await stream_response()
Streaming Multiple Queries
import asyncio
from langchain_openai import ChatOpenAI
model = ChatOpenAI( model = "gpt-4" )
async def stream_query ( query : str , prefix : str ):
print ( f " \n { prefix } : " , end = "" )
async for chunk in model.astream(query):
print (chunk.content, end = "" , flush = True )
async def stream_multiple ():
# Stream multiple queries concurrently
await asyncio.gather(
stream_query( "What is Python?" , "Q1" ),
stream_query( "What is JavaScript?" , "Q2" ),
stream_query( "What is Rust?" , "Q3" ),
)
await stream_multiple()
Streaming Chains
Stream through LCEL chains:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI( model = "gpt-4" )
prompt = ChatPromptTemplate.from_messages([
( "system" , "You are a helpful assistant." ),
( "human" , " {question} " )
])
parser = StrOutputParser()
# Build chain
chain = prompt | model | parser
# Stream through entire chain
for chunk in chain.stream({ "question" : "What is LangChain?" }):
print (chunk, end = "" , flush = True )
Streaming with Multiple Steps
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
model = ChatOpenAI( model = "gpt-4" )
# Multi-step chain
chain = (
{ "topic" : RunnablePassthrough()}
| ChatPromptTemplate.from_template( "Tell me about {topic} " )
| model
| StrOutputParser()
)
# Stream final output
for chunk in chain.stream( "machine learning" ):
print (chunk, end = "" , flush = True )
Streaming Events
Get granular control with astream_events():
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI( model = "gpt-4" )
prompt = ChatPromptTemplate.from_template( "Write about {topic} " )
chain = prompt | model
async def stream_with_events ():
async for event in chain.astream_events(
{ "topic" : "AI" },
version = "v2"
):
kind = event[ "event" ]
if kind == "on_chat_model_stream" :
# Model streaming chunks
content = event[ "data" ][ "chunk" ].content
if content:
print (content, end = "" , flush = True )
elif kind == "on_chat_model_start" :
print ( "Model started..." )
elif kind == "on_chat_model_end" :
print ( " \n Model finished!" )
await stream_with_events()
Event Types
on_chat_model_stream
on_chat_model_start
on_chat_model_end
on_chain_stream
Individual token/chunk from model: if event[ "event" ] == "on_chat_model_stream" :
chunk = event[ "data" ][ "chunk" ]
print (chunk.content, end = "" )
Model invocation started: if event[ "event" ] == "on_chat_model_start" :
print ( "Generating response..." )
Model invocation completed: if event[ "event" ] == "on_chat_model_end" :
output = event[ "data" ][ "output" ]
print ( f " \n Total tokens: { output.usage_metadata } " )
Chain component streaming: if event[ "event" ] == "on_chain_stream" :
print ( f "Chain output: { event[ 'data' ] } " )
Streaming RAG
Stream retrieval-augmented generation:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# Setup vector store
vectorstore = InMemoryVectorStore.from_texts(
[
"LangChain is a framework for LLM applications" ,
"Streaming provides real-time responses" ,
"RAG combines retrieval with generation"
],
embedding = OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
# Create RAG chain
prompt = ChatPromptTemplate.from_messages([
( "system" , "Answer using context: {context} " ),
( "human" , " {question} " )
])
model = ChatOpenAI( model = "gpt-4" )
rag_chain = (
{
"context" : retriever | ( lambda docs : " \n " .join([d.page_content for d in docs])),
"question" : RunnablePassthrough()
}
| prompt
| model
| StrOutputParser()
)
# Stream RAG output
for chunk in rag_chain.stream( "What is LangChain?" ):
print (chunk, end = "" , flush = True )
Stream agent tool calls:
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
@tool
def get_weather ( location : str ) -> str :
"""Get weather for a location."""
return f "Sunny, 72°F in { location } "
model = ChatOpenAI( model = "gpt-4" )
model_with_tools = model.bind_tools([get_weather])
# Stream response with tool calls
for chunk in model_with_tools.stream( "What's the weather in Paris?" ):
# Check for tool calls in chunk
if chunk.tool_call_chunks:
print ( f " \n Tool call: { chunk.tool_call_chunks } " )
elif chunk.content:
print (chunk.content, end = "" , flush = True )
Token Usage Tracking
Track tokens while streaming:
from langchain_openai import ChatOpenAI
model = ChatOpenAI( model = "gpt-4" )
total_tokens = 0
full_response = ""
for chunk in model.stream( "Write a haiku about coding" ):
print (chunk.content, end = "" , flush = True )
full_response += chunk.content
# Get usage from final message
final_message = model.invoke( "Write a haiku about coding" )
if final_message.usage_metadata:
print ( f " \n\n Tokens used: { final_message.usage_metadata.total_tokens } " )
Custom Stream Processing
Process chunks with custom logic:
from langchain_openai import ChatOpenAI
import re
model = ChatOpenAI( model = "gpt-4" )
def process_stream ( query : str ):
"""Stream with word counting."""
word_count = 0
buffer = ""
for chunk in model.stream(query):
content = chunk.content
print (content, end = "" , flush = True )
buffer += content
# Count words when we see spaces
if ' ' in content:
words = buffer.strip().split()
word_count += len (words) - 1 # Keep last word in buffer
buffer = words[ - 1 ] if words else ""
# Count final word
if buffer.strip():
word_count += 1
print ( f " \n\n Total words: { word_count } " )
process_stream( "Write a paragraph about Python" )
Buffering Strategies
Word Buffering
Sentence Buffering
Time-based Buffering
def stream_by_word ( chain , input_data ):
"""Buffer and output complete words."""
buffer = ""
for chunk in chain.stream(input_data):
buffer += chunk
# Output complete words
while ' ' in buffer:
word, buffer = buffer.split( ' ' , 1 )
print (word, end = " " , flush = True )
# Output remaining
if buffer:
print (buffer, flush = True )
Error Handling
Handle streaming errors gracefully:
from langchain_openai import ChatOpenAI
from langchain_core.exceptions import OutputParserException
model = ChatOpenAI( model = "gpt-4" )
try :
for chunk in model.stream( "Generate text" ):
print (chunk.content, end = "" , flush = True )
except Exception as e:
print ( f " \n Streaming error: { e } " )
# Fallback to non-streaming
response = model.invoke( "Generate text" )
print (response.content)
Streaming with Callbacks
Use callbacks for side effects:
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StreamingStdOutCallbackHandler
# Built-in streaming callback
model = ChatOpenAI(
model = "gpt-4" ,
streaming = True ,
callbacks = [StreamingStdOutCallbackHandler()]
)
# Automatically streams to stdout
model.invoke( "Tell me a story" )
Custom Callback
from langchain_core.callbacks import BaseCallbackHandler
class CustomStreamHandler ( BaseCallbackHandler ):
def __init__ ( self ):
self .tokens = []
def on_llm_new_token ( self , token : str , ** kwargs ) -> None :
"""Handle each new token."""
self .tokens.append(token)
print ( f "[ { len ( self .tokens) } ] { token } " , end = "" , flush = True )
# Use custom handler
handler = CustomStreamHandler()
model = ChatOpenAI( model = "gpt-4" , streaming = True , callbacks = [handler])
model.invoke( "Write a haiku" )
print ( f " \n\n Total tokens: { len (handler.tokens) } " )
Best Practices
Use async for concurrency
Process multiple streams concurrently with astream() and asyncio.gather().
Buffer appropriately
Choose buffering strategy based on use case (word, sentence, or time-based).
Handle errors gracefully
Wrap streaming in try-except and provide fallback to non-streaming.
Track usage metadata
Monitor token usage even when streaming for cost tracking.
Set appropriate timeouts
Configure timeouts to handle slow or stalled streams.
Test with slow connections
Ensure streaming works well with variable network conditions.
Use astream() over stream() for async contexts
Buffer chunks for smoother display (word or sentence level)
Set temperature=0 for faster, more deterministic streaming
Use smaller models (gpt-4o-mini) for lower latency
Enable streaming callbacks for automatic handling
Next Steps