Overview
The TamboStream class handles streaming AI responses with tool execution. It provides two consumption modes:
- AsyncIterable:
for await (const { event, snapshot } of stream) { ... }
- Promise:
const thread = await stream.thread
The internal processing loop always runs automatically (fire-and-forget in constructor). The .thread promise resolves when the loop completes.
Installation
npm install @tambo-ai/client
Usage
You typically don’t construct TamboStream directly—it’s returned from TamboClient.run().
import { TamboClient } from "@tambo-ai/client";
const client = new TamboClient({ apiKey: "your-api-key" });
const stream = client.run("Hello, AI!");
Consumption Modes
Async Iteration (Stream Mode)
Iterate over events as they arrive. Each iteration yields an event and a thread snapshot.
const stream = client.run("Hello, AI!");
for await (const { event, snapshot } of stream) {
console.log("Event:", event.type);
console.log("Messages:", snapshot.messages);
console.log("Status:", snapshot.status);
}
The async iterator can only be iterated once (like ReadableStream). Multiple
iterations will throw an error.
Promise Mode
Await the final thread state when the stream completes.
const stream = client.run("Hello, AI!");
const thread = await stream.thread;
console.log("Final messages:", thread.messages);
console.log("Thread ID:", thread.id);
Hybrid Approach
You can combine both modes to observe events while also getting the final result.
const stream = client.run("Hello, AI!");
// Start observing events
(async () => {
for await (const { event, snapshot } of stream) {
if (event.type === "message_parent") {
console.log("Sub-message created:", event.messageId);
}
}
})();
// Also wait for final result
const thread = await stream.thread;
console.log("Done!", thread.messages.length);
Properties
thread
Promise that resolves to the final thread state when the stream completes.
const stream = client.run("Hello, AI!");
const thread: Promise<TamboThread> = stream.thread;
Promise that resolves when the stream completes with the final thread stateCurrent run status (will be “idle” when promise resolves)
When the thread was created
When the thread was last updated
Whether the last run was cancelled
Methods
abort()
Abort the stream. The .thread promise rejects with an AbortError. The async iterator ends cleanly.
const stream = client.run("Hello, AI!");
// Abort after 5 seconds
setTimeout(() => stream.abort(), 5000);
try {
const thread = await stream.thread;
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream was aborted");
}
}
[Symbol.asyncIterator]()
Returns an async iterator over stream events. Can only be iterated once.
const stream = client.run("Hello, AI!");
for await (const { event, snapshot } of stream) {
// Process events
}
// This will throw an error:
// for await (const { event, snapshot } of stream) { }
Types
StreamEvent
Event yielded by the async iterator.
interface StreamEvent {
/** The raw AG-UI event */
event: BaseEvent;
/** A snapshot of the thread state after this event was processed */
snapshot: TamboThread;
}
TamboStreamOptions
Options for creating a TamboStream (internal use only—passed by TamboClient.run()).
interface TamboStreamOptions {
client: TamboAI;
message: InputMessage;
threadId: string | undefined;
userMessageText?: string;
componentList: ComponentRegistry;
toolRegistry: TamboToolRegistry;
userKey: string | undefined;
previousRunId: string | undefined;
additionalContext?: Record<string, unknown>;
toolChoice?: ToolChoice;
autoExecuteTools: boolean;
maxSteps: number;
debug: boolean;
signal?: AbortSignal;
dispatch: (action: StreamAction) => void;
getThreadSnapshot: (threadId: string) => TamboThread | undefined;
}
Event Types
The event property in each StreamEvent can be one of many AG-UI event types:
RUN_STARTED - Run begins, provides threadId and runId
TEXT_DELTA - Streaming text content
COMPONENT_START - Component block created
COMPONENT_PROPS_DELTA - Component props streaming
COMPONENT_STATE_DELTA - Component state streaming
COMPONENT_END - Component complete
TOOL_CALL_STARTED - Tool call begins
TOOL_CALL_ARGS - Tool arguments streaming
TOOL_CALL_DONE - Tool call complete
MESSAGE_PARENT - Sub-message created (MCP sampling/elicitation)
RUN_AWAITING_INPUT - Waiting for tool execution
RUN_COMPLETED - Run finished successfully
RUN_ERROR - Run failed with error
CUSTOM - Custom events (e.g., reasoning)
See @ag-ui/core documentation for full event reference.
Examples
Processing Specific Events
for await (const { event, snapshot } of stream) {
switch (event.type) {
case "text_delta":
process.stdout.write(event.delta);
break;
case "tool_call_started":
console.log(`\nCalling tool: ${event.toolName}`);
break;
case "tool_call_done":
console.log(`Tool ${event.toolName} completed`);
break;
case "run_completed":
console.log("\nRun complete!");
break;
}
}
Early Exit
for await (const { event, snapshot } of stream) {
console.log(event.type);
// Stop after first tool call
if (event.type === "tool_call_done") {
stream.abort();
break;
}
}
Error Handling
const stream = client.run("Hello, AI!");
try {
for await (const { event, snapshot } of stream) {
console.log(event.type);
}
console.log("Stream completed successfully");
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream was aborted");
} else {
console.error("Stream failed:", error);
}
}
Timeout Pattern
const stream = client.run("Hello, AI!", {
signal: AbortSignal.timeout(30000) // 30 second timeout
});
try {
const thread = await stream.thread;
console.log("Completed within timeout");
} catch (error) {
if (error.name === "TimeoutError" || error.name === "AbortError") {
console.log("Stream timed out");
}
}
When autoExecuteTools is enabled (default), TamboStream automatically:
- Waits for
RUN_AWAITING_INPUT event
- Executes all pending tool calls
- Sends tool results back to the API
- Continues streaming the next response
- Repeats until completion or
maxSteps reached
This creates a seamless multi-step tool execution loop without manual intervention.
const stream = client.run("What's the weather in SF?", {
autoExecuteTools: true, // default
maxSteps: 10 // default
});
const thread = await stream.thread;
console.log("Tool execution completed automatically");
Thread Snapshots
Each event includes a snapshot of the thread state after processing that event. This allows you to observe the thread evolving in real-time.
for await (const { event, snapshot } of stream) {
console.log(`Event: ${event.type}`);
console.log(`Thread has ${snapshot.messages.length} messages`);
console.log(`Status: ${snapshot.status}`);
}
The snapshot is a frozen copy—modifying it won’t affect the client’s internal state.