Documentation Index Fetch the complete documentation index at: https://mintlify.com/mastra-ai/mastra/llms.txt
Use this file to discover all available pages before exploring further.
Streaming Responses
Streaming enables real-time token-by-token responses from agents, providing immediate feedback to users and better UX for long-running operations.
Why Stream?
Benefits of streaming:
Immediate feedback - Users see responses as they’re generated
Better UX - Perceived performance improvement
Progress visibility - See tool calls and reasoning in real-time
Cancellation - Abort long-running operations
Lower latency - First token arrives faster
Basic Streaming
From: packages/core/src/agent/agent.test.ts
Text Streaming
Stream plain text responses:
import { Agent } from '@mastra/core/agent' ;
const agent = new Agent ({
id: 'assistant' ,
name: 'Assistant' ,
instructions: 'You are a helpful assistant' ,
model: 'openai/gpt-4o' ,
});
// Start streaming
const stream = await agent . stream ( 'Tell me a story about dragons' );
// Consume text stream
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
// Get final text
const finalText = await stream . text ;
console . log ( ' \n Complete:' , finalText );
Full Stream Events
Access all stream events including tool calls:
const stream = await agent . stream ( 'What is the weather in NYC?' );
for await ( const event of stream . fullStream ) {
switch ( event . type ) {
case 'text-delta' :
process . stdout . write ( event . textDelta );
break ;
case 'tool-call' :
console . log ( ` \n Calling tool: ${ event . toolName } ` );
console . log ( 'Args:' , event . args );
break ;
case 'tool-result' :
console . log ( `Tool ${ event . toolName } completed` );
console . log ( 'Result:' , event . result );
break ;
case 'finish' :
console . log ( ' \n Stream finished' );
break ;
}
}
Stream Options
Basic Options
const stream = await agent . stream ( 'Hello' , {
// Memory configuration
memory: {
resource: 'user-123' ,
thread: { id: 'thread-456' },
},
// Model settings
modelSettings: {
temperature: 0.7 ,
maxTokens: 1000 ,
},
// Maximum agentic steps (tool calls)
maxSteps: 5 ,
// Request context for dynamic config
requestContext: ctx ,
});
Callbacks
React to stream events:
const stream = await agent . stream ( 'Analyze this data' , {
// Called after each step completes
onStepFinish : async ({ text , toolCalls , toolResults }) => {
console . log ( 'Step completed' );
console . log ( 'Tool calls:' , toolCalls . length );
},
// Called when stream finishes
onFinish : async ({ text , finishReason }) => {
console . log ( 'Final text:' , text );
console . log ( 'Finish reason:' , finishReason );
},
// Called for each chunk
onChunk : async ({ chunk }) => {
// Process individual chunks
},
// Called on errors
onError : async ({ error }) => {
console . error ( 'Stream error:' , error );
},
});
From: examples/agent/src/mastra/agents/model-v2-agent.ts
import { Agent } from '@mastra/core/agent' ;
import { createTool } from '@mastra/core/tools' ;
import { z } from 'zod' ;
const weatherTool = createTool ({
id: 'get-weather' ,
description: 'Get current weather' ,
inputSchema: z . object ({
location: z . string (),
}),
execute : async ({ location }) => {
return { temp: 72 , condition: 'sunny' };
},
});
const agent = new Agent ({
id: 'weather-agent' ,
name: 'Weather Agent' ,
instructions: 'Help with weather information' ,
model: 'openai/gpt-4o' ,
tools: { weatherTool },
});
const stream = await agent . stream ( 'What is the weather in San Francisco?' );
for await ( const event of stream . fullStream ) {
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . textDelta );
}
if ( event . type === 'tool-call' ) {
console . log ( ` \n [Calling ${ event . toolName } ]` );
}
if ( event . type === 'tool-result' ) {
console . log ( `[ ${ event . toolName } completed] \n ` );
}
}
// Wait for completion
const result = await stream . text ;
const toolCalls = await stream . toolCalls ;
console . log ( ' \n Final result:' , result );
console . log ( 'Tools used:' , toolCalls . length );
Stream Abort
Cancel streaming operations:
const abortController = new AbortController ();
const stream = await agent . stream ( 'Generate a long document' , {
abortSignal: abortController . signal ,
onAbort : async () => {
console . log ( 'Stream aborted by user' );
},
});
// Cancel after 5 seconds
setTimeout (() => {
abortController . abort ();
}, 5000 );
try {
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
} catch ( error ) {
if ( error . name === 'AbortError' ) {
console . log ( ' \n Stream was cancelled' );
}
}
Streaming with Memory
import { Memory } from '@mastra/memory' ;
const agent = new Agent ({
id: 'chat-agent' ,
name: 'Chat Agent' ,
instructions: 'You are a conversational assistant' ,
model: 'openai/gpt-4o' ,
memory: new Memory (),
});
// First message
const stream1 = await agent . stream ( 'My name is Alice' , {
memory: {
resource: 'user-123' ,
thread: { id: 'conversation-1' },
},
});
for await ( const chunk of stream1 . textStream ) {
process . stdout . write ( chunk );
}
// Second message - agent remembers context
const stream2 = await agent . stream ( 'What is my name?' , {
memory: {
resource: 'user-123' ,
thread: { id: 'conversation-1' },
},
});
for await ( const chunk of stream2 . textStream ) {
process . stdout . write ( chunk );
}
// Output: "Your name is Alice."
Advanced Patterns
Progress Tracking
From: examples/agent/src/mastra/agents/model-v2-agent.ts
const stream = await agent . stream ( 'Analyze this dataset' , {
maxSteps: 10 ,
onIterationComplete : async ({ iteration , maxIterations , toolCalls , text }) => {
const progress = maxIterations ? ( iteration / maxIterations ) * 100 : 0 ;
console . log ( `Progress: ${ progress . toFixed ( 0 ) } %` );
console . log ( `Iteration ${ iteration } : Called ${ toolCalls . length } tools` );
// Optionally control continuation
if ( text . includes ( 'COMPLETE' )) {
return { continue: false };
}
return { continue: true };
},
});
Require approval for sensitive tools:
import { createTool } from '@mastra/core/tools' ;
const deleteFileTool = createTool ({
id: 'delete-file' ,
description: 'Delete a file' ,
requireApproval: true ,
inputSchema: z . object ({
path: z . string (),
}),
execute : async ({ path }) => {
await fs . unlink ( path );
return { deleted: true };
},
});
const agent = new Agent ({
id: 'file-agent' ,
name: 'File Agent' ,
instructions: 'Manage files' ,
model: 'openai/gpt-4o' ,
tools: { deleteFileTool },
});
const stream = await agent . stream ( 'Delete old logs' , {
requireToolApproval: true ,
});
for await ( const event of stream . fullStream ) {
if ( event . type === 'tool-call' && event . approval ) {
console . log ( `Approve deletion of ${ event . args . path } ? (y/n)` );
// Get user input (simplified)
const answer = await getUserInput ();
if ( answer === 'y' ) {
await event . approval . approve ();
} else {
await event . approval . reject ( 'User declined' );
}
}
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . textDelta );
}
}
Multi-Step Operations
const stream = await agent . stream (
'Research quantum computing and create a report' ,
{
maxSteps: 15 ,
onStepFinish : async ({ stepNumber , toolCalls , toolResults }) => {
console . log ( ` \n --- Step ${ stepNumber } ---` );
toolCalls . forEach (( call , i ) => {
console . log ( `Tool: ${ call . toolName } ` );
console . log ( `Result: ${ JSON . stringify ( toolResults [ i ]?. result ) } ` );
});
},
}
);
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
const finalResult = await stream . text ;
console . log ( ' \n\n Final report:' , finalResult );
Server-Sent Events (SSE)
Stream responses to web clients:
import { Agent } from '@mastra/core/agent' ;
import { streamToResponse } from '@mastra/core/stream' ;
const agent = new Agent ({
id: 'chat' ,
name: 'Chat Agent' ,
instructions: 'You are helpful' ,
model: 'openai/gpt-4o' ,
});
// Express/Node.js endpoint
app . post ( '/api/chat' , async ( req , res ) => {
const { message } = req . body ;
const stream = await agent . stream ( message );
// Convert to SSE
res . setHeader ( 'Content-Type' , 'text/event-stream' );
res . setHeader ( 'Cache-Control' , 'no-cache' );
res . setHeader ( 'Connection' , 'keep-alive' );
for await ( const event of stream . fullStream ) {
res . write ( `data: ${ JSON . stringify ( event ) } \n\n ` );
}
res . end ();
});
WebSocket Streaming
import { WebSocket , WebSocketServer } from 'ws' ;
const wss = new WebSocketServer ({ port: 8080 });
wss . on ( 'connection' , ( ws ) => {
ws . on ( 'message' , async ( message ) => {
const { text } = JSON . parse ( message );
const stream = await agent . stream ( text );
for await ( const event of stream . fullStream ) {
ws . send ( JSON . stringify ({
type: event . type ,
data: event ,
}));
}
ws . send ( JSON . stringify ({ type: 'done' }));
});
});
React Integration
import { useState } from 'react' ;
import { useChat } from '@mastra/react' ;
function ChatComponent () {
const { messages , input , setInput , sendMessage , isLoading } = useChat ({
api: '/api/chat' ,
onChunk : ( chunk ) => {
// Handle streaming chunks
console . log ( 'Chunk:' , chunk );
},
});
return (
< div >
< div >
{ messages . map (( msg , i ) => (
< div key = { i } >
< strong >{msg. role } : </ strong > {msg. content }
</ div >
))}
</ div >
< input
value = { input }
onChange = {(e) => setInput (e.target.value)}
onKeyDown = {(e) => {
if ( e . key === 'Enter' ) sendMessage ();
}}
disabled = { isLoading }
/>
{ isLoading && < div > Streaming ...</ div >}
</ div >
);
}
Best Practices
Always wrap streaming in try-catch: try {
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
} catch ( error ) {
console . error ( 'Streaming failed:' , error );
// Show fallback UI
}
Use abort signals for cancellation
Allow users to cancel long operations: const controller = new AbortController ();
const stream = await agent . stream ( prompt , {
abortSignal: controller . signal ,
});
// Cancel button
button . onclick = () => controller . abort ();
Buffer partial tokens for better UX
Accumulate tokens before displaying: let buffer = '' ;
for await ( const chunk of stream . textStream ) {
buffer += chunk ;
// Display every 5 chars or on punctuation
if ( buffer . length >= 5 || / [ .!? ] / . test ( chunk )) {
display ( buffer );
buffer = '' ;
}
}
Show streaming indicators: setStatus ( 'streaming' );
try {
for await ( const chunk of stream . textStream ) {
appendToUI ( chunk );
}
setStatus ( 'complete' );
} catch ( error ) {
setStatus ( 'error' );
}
Next Steps
Structured Output Get typed, validated responses from agents
Memory Add conversation persistence
Network Orchestrate multi-agent workflows
Stream API Reference Complete streaming API documentation