Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Bijit-Mondal/VoiceAgent/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The StreamProcessor module provides functions to process the fullStream from an AI SDK streamText call. It forwards all stream events to WebSocket clients in real-time and collects the complete response data. Unlike the other core managers, this is not a class but a standalone function module. It has no persistent state and operates as a pure stream processing pipeline. Key responsibilities:
  • Process AI SDK fullStream events
  • Forward text deltas, reasoning, tool calls to WebSocket clients
  • Collect complete response data (text, reasoning, tool results)
  • Emit events on the agent for local handling
  • Support streaming speech integration via callbacks
  • Handle stream lifecycle (start, finish, error, abort)
Location: src/core/StreamProcessor.ts:40

Functions

processFullStream()

Process the fullStream from an AI SDK streamText call, forwarding events and collecting results.
processFullStream(
  result: ReturnType<typeof streamText>,
  callbacks: StreamProcessorCallbacks,
  extraResponseFields?: Record<string, unknown>
): Promise<StreamResult>
result
ReturnType<typeof streamText>
required
The result object from AI SDK’s streamText() call
callbacks
StreamProcessorCallbacks
required
Callbacks for handling stream events:
  • onTextDelta(text: string) - Called for each text chunk (for streaming speech)
  • onTextEnd() - Called when text stream ends (flush speech buffer)
  • sendMessage(msg) - Function to send WebSocket messages
  • emitEvent(event, data) - Function to emit events on the agent
extraResponseFields
Record<string, unknown>
Optional additional fields to include in the response_complete message
Returns: Promise resolving to StreamResult containing:
  • fullText - Complete response text
  • fullReasoning - Complete reasoning text (if any)
  • allToolCalls - Array of all tool calls made
  • allToolResults - Array of all tool results
  • allSources - Array of all sources (if any)
  • allFiles - Array of all files (if any)
Example:
import { streamText } from 'ai';
import { processFullStream } from './core/StreamProcessor';

const result = streamText({
  model: openai('gpt-4-turbo'),
  messages: conversationHistory,
  tools: { /* ... */ }
});

const streamResult = await processFullStream(result, {
  onTextDelta: (text) => {
    // Send to speech manager for streaming TTS
    speechManager.processTextDelta(text);
  },
  onTextEnd: () => {
    // Flush remaining text to speech
    speechManager.flushPendingText();
  },
  sendMessage: (msg) => {
    // Send to WebSocket client
    wsManager.send(msg);
  },
  emitEvent: (event, data) => {
    // Emit on agent
    agent.emit(event, data);
  }
});

console.log('Full response:', streamResult.fullText);

handleStreamChunk()

Handle individual stream chunks and emit them as events (for onChunk callback).
handleStreamChunk(
  chunk: any,
  emitEvent: (event: string, data?: unknown) => void
): void
chunk
any
required
A stream chunk from the AI SDK fullStream
emitEvent
(event: string, data?: unknown) => void
required
Function to emit events
Example:
const result = streamText({
  model: openai('gpt-4-turbo'),
  messages,
  onChunk: (chunk) => {
    handleStreamChunk(chunk, (event, data) => {
      agent.emit(event, data);
    });
  }
});
Events emitted:
  • chunk:text_delta - Text chunk received
  • chunk:reasoning_delta - Reasoning chunk received
  • chunk:tool_call - Tool call chunk received
  • chunk:tool_result - Tool result chunk received
  • chunk:tool_input_start - Tool input started
  • chunk:tool_input_delta - Tool input chunk received
  • chunk:source - Source chunk received

Types

StreamResult

interface StreamResult {
  fullText: string;
  fullReasoning: string;
  allToolCalls: Array<{
    toolName: string;
    toolCallId: string;
    input: unknown;
  }>;
  allToolResults: Array<{
    toolName: string;
    toolCallId: string;
    output: unknown;
  }>;
  allSources: Array<unknown>;
  allFiles: Array<unknown>;
}
fullText
string
The complete text response accumulated from all text-delta events
fullReasoning
string
The complete reasoning text accumulated from all reasoning-delta events (empty string if no reasoning)
allToolCalls
Array<ToolCall>
All tool calls made during the stream, in order
allToolResults
Array<ToolResult>
All tool results returned during the stream, in order
allSources
Array<unknown>
All sources emitted during the stream (for RAG/citation)
allFiles
Array<unknown>
All files emitted during the stream

StreamProcessorCallbacks

interface StreamProcessorCallbacks {
  onTextDelta?: (text: string) => void;
  onTextEnd?: () => void;
  sendMessage: (message: Record<string, unknown>) => void;
  emitEvent: (event: string, data?: unknown) => void;
}
onTextDelta
(text: string) => void
Optional callback for each text chunk. Use this to feed text to SpeechManager for streaming TTS.
onTextEnd
() => void
Optional callback when text stream ends. Use this to flush the SpeechManager buffer.
sendMessage
(message: Record<string, unknown>) => void
required
Function to send messages to the WebSocket client
emitEvent
(event: string, data?: unknown) => void
required
Function to emit events on the agent EventEmitter

WebSocket Message Protocol

The StreamProcessor sends the following messages to clients:

Stream Lifecycle

Stream start:
{ "type": "stream_start" }
Stream finish:
{
  "type": "stream_finish",
  "finishReason": "stop",
  "usage": {
    "promptTokens": 150,
    "completionTokens": 75,
    "totalTokens": 225
  }
}
Stream error:
{
  "type": "stream_error",
  "error": "Model timeout"
}
Stream abort:
{
  "type": "stream_abort",
  "reason": "User interrupted"
}

Step Lifecycle

Step start:
{
  "type": "step_start",
  "warnings": []
}
Step finish:
{
  "type": "step_finish",
  "finishReason": "stop",
  "usage": { /* ... */ }
}

Text Streaming

Text start:
{
  "type": "text_start",
  "id": "text-0"
}
Text delta:
{
  "type": "text_delta",
  "id": "text-0",
  "text": "Hello, "
}
Text end:
{
  "type": "text_end",
  "id": "text-0"
}

Reasoning Streaming

Reasoning start:
{
  "type": "reasoning_start",
  "id": "reasoning-0"
}
Reasoning delta:
{
  "type": "reasoning_delta",
  "id": "reasoning-0",
  "text": "Let me think... "
}
Reasoning end:
{
  "type": "reasoning_end",
  "id": "reasoning-0"
}

Tool Execution

Tool call:
{
  "type": "tool_call",
  "toolName": "get_weather",
  "toolCallId": "call_abc123",
  "input": { "location": "San Francisco" }
}
Tool result:
{
  "type": "tool_result",
  "toolName": "get_weather",
  "toolCallId": "call_abc123",
  "result": { "temperature": 72, "conditions": "sunny" }
}
Tool error:
{
  "type": "tool_error",
  "toolName": "get_weather",
  "toolCallId": "call_abc123",
  "error": "API timeout"
}

Tool Input Streaming

Tool input start:
{
  "type": "tool_input_start",
  "id": "input-0",
  "toolName": "search"
}
Tool input delta:
{
  "type": "tool_input_delta",
  "id": "input-0",
  "delta": { "query": "weather" }
}
Tool input end:
{
  "type": "tool_input_end",
  "id": "input-0"
}

Sources and Files

Source:
{
  "type": "source",
  "source": { /* source object */ }
}
File:
{
  "type": "file",
  "file": { /* file object */ }
}

Response Complete

Complete response:
{
  "type": "response_complete",
  "text": "The weather in San Francisco is 72 degrees and sunny.",
  "reasoning": "",
  "toolCalls": [
    {
      "toolName": "get_weather",
      "toolCallId": "call_abc123",
      "input": { "location": "San Francisco" }
    }
  ],
  "toolResults": [
    {
      "toolName": "get_weather",
      "toolCallId": "call_abc123",
      "output": { "temperature": 72, "conditions": "sunny" }
    }
  ],
  "sources": [],
  "files": []
}

Usage in Agent Architecture

class VoiceAgent {
  async handleUserInput(text: string) {
    // Add user message to history
    this.conversationManager.addMessage({
      role: 'user',
      content: text
    });
    
    // Start LLM stream
    const result = streamText({
      model: this.model,
      messages: this.conversationManager.getHistory(),
      tools: this.tools
    });
    
    // Process the stream
    const streamResult = await processFullStream(result, {
      onTextDelta: (text) => {
        // Feed text to speech manager for streaming TTS
        this.speechManager.processTextDelta(text);
      },
      onTextEnd: () => {
        // Flush remaining text to speech
        this.speechManager.flushPendingText();
      },
      sendMessage: (msg) => {
        // Send to WebSocket client
        this.wsManager.send(msg);
      },
      emitEvent: (event, data) => {
        // Emit on agent
        this.emit(event, data);
      }
    });
    
    // Add assistant message to history
    this.conversationManager.addMessage({
      role: 'assistant',
      content: streamResult.fullText
    });
    
    // Wait for speech to complete
    await this.speechManager.queueDonePromise;
  }
}

Event Flow

Typical event sequence for a simple text response:
1. stream_start
2. step_start
3. text_start (id: "text-0")
4. text_delta (id: "text-0", text: "Hello")
5. text_delta (id: "text-0", text: ", how")
6. text_delta (id: "text-0", text: " can I")
7. text_delta (id: "text-0", text: " help you?")
8. text_end (id: "text-0")
9. step_finish
10. stream_finish
11. response_complete
With tool calls:
1. stream_start
2. step_start
3. tool_input_start (id: "input-0", toolName: "get_weather")
4. tool_input_delta (id: "input-0", delta: {"location": "SF"})
5. tool_input_end (id: "input-0")
6. tool_call (toolName: "get_weather", input: {"location": "SF"})
7. tool_result (toolName: "get_weather", result: {"temp": 72})
8. step_finish
9. step_start (new step for response after tool call)
10. text_start (id: "text-0")
11. text_delta (id: "text-0", text: "The weather...")
12. text_end (id: "text-0")
13. step_finish
14. stream_finish
15. response_complete

Integration with Speech Manager

The StreamProcessor enables streaming TTS by calling onTextDelta for each text chunk:
// In processFullStream
case "text-delta":
  fullText += part.text;
  onTextDelta?.(part.text); // Feed to SpeechManager
  sendMessage({
    type: "text_delta",
    id: part.id,
    text: part.text
  });
  break;

case "text-end":
  onTextEnd?.(); // Flush SpeechManager buffer
  sendMessage({ type: "text_end", id: part.id });
  break;
This allows speech generation to start before the full response is complete, significantly reducing perceived latency.

Error Handling

Stream Errors

When the AI SDK stream encounters an error:
case "error":
  emitEvent("error", part.error);
  sendMessage({
    type: "stream_error",
    error: String(part.error)
  });
  break;

Abort Handling

When the stream is aborted:
case "abort":
  emitEvent("abort", { reason: part.reason });
  sendMessage({
    type: "stream_abort",
    reason: part.reason
  });
  break;

Performance Considerations

Memory Usage

  • Text accumulation: Stores complete text/reasoning in memory
  • Tool data: Stores all tool calls and results
  • Stream processing: Minimal overhead, processes events one at a time

Optimization Tips

  1. Use onTextDelta wisely - Avoid heavy processing in this callback
  2. Batch WebSocket sends - The processor sends one message per event
  3. Monitor memory - For very long responses, consider streaming to storage
  4. Handle backpressure - WebSocket may not keep up with fast streams

Thread Safety

The StreamProcessor is stateless and thread-safe for the processing logic itself. However, the callbacks must be called from the same event loop as the agent and managers.

Build docs developers (and LLMs) love