Skip to main content

Overview

The TamboStream class handles streaming AI responses with tool execution. It provides two consumption modes:
  1. AsyncIterable: for await (const { event, snapshot } of stream) { ... }
  2. Promise: const thread = await stream.thread
The internal processing loop always runs automatically (fire-and-forget in constructor). The .thread promise resolves when the loop completes.

Installation

npm install @tambo-ai/client

Usage

You typically don’t construct TamboStream directly—it’s returned from TamboClient.run().
import { TamboClient } from "@tambo-ai/client";

const client = new TamboClient({ apiKey: "your-api-key" });
const stream = client.run("Hello, AI!");

Consumption Modes

Async Iteration (Stream Mode)

Iterate over events as they arrive. Each iteration yields an event and a thread snapshot.
const stream = client.run("Hello, AI!");

for await (const { event, snapshot } of stream) {
  console.log("Event:", event.type);
  console.log("Messages:", snapshot.messages);
  console.log("Status:", snapshot.status);
}
The async iterator can only be iterated once (like ReadableStream). Multiple iterations will throw an error.

Promise Mode

Await the final thread state when the stream completes.
const stream = client.run("Hello, AI!");
const thread = await stream.thread;

console.log("Final messages:", thread.messages);
console.log("Thread ID:", thread.id);

Hybrid Approach

You can combine both modes to observe events while also getting the final result.
const stream = client.run("Hello, AI!");

// Start observing events
(async () => {
  for await (const { event, snapshot } of stream) {
    if (event.type === "message_parent") {
      console.log("Sub-message created:", event.messageId);
    }
  }
})();

// Also wait for final result
const thread = await stream.thread;
console.log("Done!", thread.messages.length);

Properties

thread

Promise that resolves to the final thread state when the stream completes.
const stream = client.run("Hello, AI!");
const thread: Promise<TamboThread> = stream.thread;
thread
Promise<TamboThread>
Promise that resolves when the stream completes with the final thread state
thread.id
string
Unique thread identifier
thread.messages
TamboThreadMessage[]
Messages in the thread
thread.status
RunStatus
Current run status (will be “idle” when promise resolves)
thread.name
string
Thread name (if set)
thread.metadata
Record<string, unknown>
Thread metadata
thread.createdAt
string
When the thread was created
thread.updatedAt
string
When the thread was last updated
thread.lastRunCancelled
boolean
Whether the last run was cancelled

Methods

abort()

Abort the stream. The .thread promise rejects with an AbortError. The async iterator ends cleanly.
const stream = client.run("Hello, AI!");

// Abort after 5 seconds
setTimeout(() => stream.abort(), 5000);

try {
  const thread = await stream.thread;
} catch (error) {
  if (error.name === "AbortError") {
    console.log("Stream was aborted");
  }
}

[Symbol.asyncIterator]()

Returns an async iterator over stream events. Can only be iterated once.
const stream = client.run("Hello, AI!");

for await (const { event, snapshot } of stream) {
  // Process events
}

// This will throw an error:
// for await (const { event, snapshot } of stream) { }

Types

StreamEvent

Event yielded by the async iterator.
interface StreamEvent {
  /** The raw AG-UI event */
  event: BaseEvent;
  /** A snapshot of the thread state after this event was processed */
  snapshot: TamboThread;
}

TamboStreamOptions

Options for creating a TamboStream (internal use only—passed by TamboClient.run()).
interface TamboStreamOptions {
  client: TamboAI;
  message: InputMessage;
  threadId: string | undefined;
  userMessageText?: string;
  componentList: ComponentRegistry;
  toolRegistry: TamboToolRegistry;
  userKey: string | undefined;
  previousRunId: string | undefined;
  additionalContext?: Record<string, unknown>;
  toolChoice?: ToolChoice;
  autoExecuteTools: boolean;
  maxSteps: number;
  debug: boolean;
  signal?: AbortSignal;
  dispatch: (action: StreamAction) => void;
  getThreadSnapshot: (threadId: string) => TamboThread | undefined;
}

Event Types

The event property in each StreamEvent can be one of many AG-UI event types:
  • RUN_STARTED - Run begins, provides threadId and runId
  • TEXT_DELTA - Streaming text content
  • COMPONENT_START - Component block created
  • COMPONENT_PROPS_DELTA - Component props streaming
  • COMPONENT_STATE_DELTA - Component state streaming
  • COMPONENT_END - Component complete
  • TOOL_CALL_STARTED - Tool call begins
  • TOOL_CALL_ARGS - Tool arguments streaming
  • TOOL_CALL_DONE - Tool call complete
  • MESSAGE_PARENT - Sub-message created (MCP sampling/elicitation)
  • RUN_AWAITING_INPUT - Waiting for tool execution
  • RUN_COMPLETED - Run finished successfully
  • RUN_ERROR - Run failed with error
  • CUSTOM - Custom events (e.g., reasoning)
See @ag-ui/core documentation for full event reference.

Examples

Processing Specific Events

for await (const { event, snapshot } of stream) {
  switch (event.type) {
    case "text_delta":
      process.stdout.write(event.delta);
      break;

    case "tool_call_started":
      console.log(`\nCalling tool: ${event.toolName}`);
      break;

    case "tool_call_done":
      console.log(`Tool ${event.toolName} completed`);
      break;

    case "run_completed":
      console.log("\nRun complete!");
      break;
  }
}

Early Exit

for await (const { event, snapshot } of stream) {
  console.log(event.type);

  // Stop after first tool call
  if (event.type === "tool_call_done") {
    stream.abort();
    break;
  }
}

Error Handling

const stream = client.run("Hello, AI!");

try {
  for await (const { event, snapshot } of stream) {
    console.log(event.type);
  }
  console.log("Stream completed successfully");
} catch (error) {
  if (error.name === "AbortError") {
    console.log("Stream was aborted");
  } else {
    console.error("Stream failed:", error);
  }
}

Timeout Pattern

const stream = client.run("Hello, AI!", {
  signal: AbortSignal.timeout(30000) // 30 second timeout
});

try {
  const thread = await stream.thread;
  console.log("Completed within timeout");
} catch (error) {
  if (error.name === "TimeoutError" || error.name === "AbortError") {
    console.log("Stream timed out");
  }
}

Tool Execution

When autoExecuteTools is enabled (default), TamboStream automatically:
  1. Waits for RUN_AWAITING_INPUT event
  2. Executes all pending tool calls
  3. Sends tool results back to the API
  4. Continues streaming the next response
  5. Repeats until completion or maxSteps reached
This creates a seamless multi-step tool execution loop without manual intervention.
const stream = client.run("What's the weather in SF?", {
  autoExecuteTools: true, // default
  maxSteps: 10 // default
});

const thread = await stream.thread;
console.log("Tool execution completed automatically");

Thread Snapshots

Each event includes a snapshot of the thread state after processing that event. This allows you to observe the thread evolving in real-time.
for await (const { event, snapshot } of stream) {
  console.log(`Event: ${event.type}`);
  console.log(`Thread has ${snapshot.messages.length} messages`);
  console.log(`Status: ${snapshot.status}`);
}
The snapshot is a frozen copy—modifying it won’t affect the client’s internal state.

Build docs developers (and LLMs) love