Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/badlogic/pi-mono/llms.txt

Use this file to discover all available pages before exploring further.

Streaming API

The streaming API provides real-time access to model responses as they are generated. This enables building responsive UIs and handling partial results before the full response is complete.

Streaming vs Complete

Two ways to interact with models:
import { getModel, stream } from '@mariozechner/pi-ai';

const model = getModel('openai', 'gpt-4o-mini');
const s = stream(model, context);

for await (const event of s) {
  if (event.type === 'text_delta') {
    process.stdout.write(event.delta);
  }
}

const message = await s.result();
console.log(`Total tokens: ${message.usage.totalTokens}`);

Event Types

All events emitted during assistant message generation:
Event TypeDescriptionKey Properties
startStream beginspartial: Initial assistant message
text_startText block startscontentIndex: Position in content array
text_deltaText chunk receiveddelta: New text, contentIndex
text_endText block completecontent: Full text, contentIndex
thinking_startThinking block startscontentIndex: Position in content array
thinking_deltaThinking chunk receiveddelta: New text, contentIndex
thinking_endThinking block completecontent: Full thinking, contentIndex
toolcall_startTool call beginscontentIndex: Position in content array
toolcall_deltaTool arguments streamingdelta: JSON chunk, partial.content[contentIndex].arguments
toolcall_endTool call completetoolCall: Complete tool call with id, name, arguments
doneStream completereason: Stop reason, message: Final assistant message
errorError occurredreason: “error” or “aborted”, error: AssistantMessage with partial content

Basic Streaming

import { getModel, stream } from '@mariozechner/pi-ai';

const model = getModel('openai', 'gpt-4o-mini');
const context = {
  messages: [{ role: 'user', content: 'Write a short poem' }]
};

const s = stream(model, context);

for await (const event of s) {
  switch (event.type) {
    case 'start':
      console.log(`Starting with ${event.partial.model}`);
      break;
    case 'text_start':
      console.log('\n[Text started]');
      break;
    case 'text_delta':
      process.stdout.write(event.delta);
      break;
    case 'text_end':
      console.log('\n[Text ended]');
      break;
    case 'done':
      console.log(`\nFinished: ${event.reason}`);
      break;
    case 'error':
      console.error(`Error: ${event.error.errorMessage}`);
      break;
  }
}

const message = await s.result();
console.log(`Tokens: ${message.usage.totalTokens}`);
console.log(`Cost: $${message.usage.cost.total.toFixed(4)}`);

Streaming with Tool Calls

import { getModel, stream, Type, Tool } from '@mariozechner/pi-ai';

const tools: Tool[] = [{
  name: 'get_weather',
  description: 'Get current weather',
  parameters: Type.Object({
    location: Type.String({ description: 'City name' })
  })
}];

const context = {
  messages: [{ role: 'user', content: 'What is the weather in Paris?' }],
  tools
};

const s = stream(model, context);

for await (const event of s) {
  switch (event.type) {
    case 'text_delta':
      process.stdout.write(event.delta);
      break;
    
    case 'toolcall_start':
      console.log(`\n[Tool call started: index ${event.contentIndex}]`);
      break;
    
    case 'toolcall_delta':
      // Partial tool arguments are being streamed
      const partialCall = event.partial.content[event.contentIndex];
      if (partialCall.type === 'toolCall') {
        console.log(`[Streaming args for ${partialCall.name}]`);
        // BE DEFENSIVE: arguments may be incomplete
        if (partialCall.arguments.location) {
          console.log(`  Location so far: ${partialCall.arguments.location}`);
        }
      }
      break;
    
    case 'toolcall_end':
      console.log(`\nTool called: ${event.toolCall.name}`);
      console.log(`Arguments: ${JSON.stringify(event.toolCall.arguments)}`);
      break;
    
    case 'done':
      console.log(`\nFinished: ${event.reason}`);
      break;
  }
}

const message = await s.result();

// Handle tool calls
for (const block of message.content) {
  if (block.type === 'toolCall') {
    // Execute tool and add result
    const result = await executeWeatherApi(block.arguments);
    context.messages.push(message);
    context.messages.push({
      role: 'toolResult',
      toolCallId: block.id,
      toolName: block.name,
      content: [{ type: 'text', text: JSON.stringify(result) }],
      isError: false,
      timestamp: Date.now()
    });
  }
}

Partial Tool Arguments

During streaming, tool arguments are progressively parsed as they arrive:
for await (const event of s) {
  if (event.type === 'toolcall_delta') {
    const toolCall = event.partial.content[event.contentIndex];
    
    if (toolCall.type === 'toolCall' && toolCall.arguments) {
      // BE DEFENSIVE: arguments may be incomplete
      // Fields may be missing or truncated
      if (toolCall.name === 'write_file' && toolCall.arguments.path) {
        console.log(`Writing to: ${toolCall.arguments.path}`);
        
        // Content might be partial or missing
        if (toolCall.arguments.content) {
          console.log(`Bytes so far: ${toolCall.arguments.content.length}`);
        }
      }
    }
  }
  
  if (event.type === 'toolcall_end') {
    // Here toolCall.arguments is complete (but not yet validated)
    const toolCall = event.toolCall;
    console.log(`Tool completed: ${toolCall.name}`, toolCall.arguments);
  }
}
Important notes:
  • During toolcall_delta, arguments contains best-effort parse of partial JSON
  • Fields may be missing, incomplete, or truncated mid-word
  • Arrays and nested objects may be partially populated
  • At minimum, arguments will be an empty object {}, never undefined
  • Google provider does not support function call streaming - you get a single toolcall_delta with full arguments

Streaming Thinking/Reasoning

Models with reasoning capabilities emit thinking events:
import { getModel, streamSimple } from '@mariozechner/pi-ai';

const model = getModel('anthropic', 'claude-sonnet-4-20250514');
const context = {
  messages: [{ role: 'user', content: 'Solve: 2x + 5 = 13' }]
};

const s = streamSimple(model, context, {
  reasoning: 'medium' // 'minimal' | 'low' | 'medium' | 'high' | 'xhigh'
});

for await (const event of s) {
  switch (event.type) {
    case 'thinking_start':
      console.log('[Model is thinking...]');
      break;
    
    case 'thinking_delta':
      // Stream thinking content in real-time
      process.stdout.write(event.delta);
      break;
    
    case 'thinking_end':
      console.log('\n[Thinking complete]');
      break;
    
    case 'text_delta':
      process.stdout.write(event.delta);
      break;
  }
}

const message = await s.result();

// Access thinking and text blocks
for (const block of message.content) {
  if (block.type === 'thinking') {
    console.log('Thinking:', block.thinking);
  } else if (block.type === 'text') {
    console.log('Response:', block.text);
  }
}

Stop Reasons

Every completed stream includes a stop reason:
const message = await s.result();

switch (message.stopReason) {
  case 'stop':
    console.log('Normal completion');
    break;
  case 'length':
    console.log('Hit max token limit');
    break;
  case 'toolUse':
    console.log('Model is calling tools');
    break;
  case 'error':
    console.log('Error occurred:', message.errorMessage);
    break;
  case 'aborted':
    console.log('Request was cancelled');
    break;
}

Error Handling

const s = stream(model, context);

for await (const event of s) {
  if (event.type === 'error') {
    // event.reason is either "error" or "aborted"
    console.error(`Error (${event.reason}):`, event.error.errorMessage);
    
    // Partial content received before error
    console.log('Partial content:', event.error.content);
    
    // Partial usage and cost
    console.log('Tokens used:', event.error.usage);
  }
}

const message = await s.result();
if (message.stopReason === 'error' || message.stopReason === 'aborted') {
  console.error('Request failed:', message.errorMessage);
}

Aborting Requests

Cancel in-progress requests with an abort signal:
import { getModel, stream } from '@mariozechner/pi-ai';

const model = getModel('openai', 'gpt-4o-mini');
const controller = new AbortController();

// Abort after 2 seconds
setTimeout(() => controller.abort(), 2000);

const s = stream(model, {
  messages: [{ role: 'user', content: 'Write a long story' }]
}, {
  signal: controller.signal
});

for await (const event of s) {
  if (event.type === 'text_delta') {
    process.stdout.write(event.delta);
  } else if (event.type === 'error') {
    console.log(`${event.reason === 'aborted' ? 'Aborted' : 'Error'}:`, event.error.errorMessage);
  }
}

const response = await s.result();
if (response.stopReason === 'aborted') {
  console.log('Request was aborted');
  console.log('Partial content:', response.content);
  console.log('Tokens used:', response.usage);
}

Continuing After Abort

Aborted messages can be added to context and continued:
const context = {
  messages: [
    { role: 'user', content: 'Explain quantum computing' }
  ]
};

// First request gets aborted
const controller = new AbortController();
setTimeout(() => controller.abort(), 2000);

const partial = await complete(model, context, { signal: controller.signal });

// Add partial response to context
context.messages.push(partial);
context.messages.push({ role: 'user', content: 'Please continue' });

// Continue the conversation
const continuation = await complete(model, context);

Stream Options

Common options for streaming requests:
import { getModel, stream } from '@mariozechner/pi-ai';

const s = stream(model, context, {
  temperature: 0.7,
  maxTokens: 2000,
  apiKey: 'sk-...', // Override environment variable
  signal: controller.signal, // Abort signal
  sessionId: 'session-123', // Session-based caching (where supported)
  cacheRetention: 'long', // 'none' | 'short' | 'long'
  transport: 'websocket', // 'sse' | 'websocket' | 'auto' (OpenAI Codex)
  headers: {
    'X-Custom-Header': 'value'
  },
  metadata: {
    user_id: 'user-123' // Provider-specific metadata
  },
  onPayload: (payload) => {
    // Inspect request payload before sending
    console.log('Payload:', JSON.stringify(payload, null, 2));
  }
});

Simplified Streaming API

For reasoning-capable models, use streamSimple with unified options:
import { getModel, streamSimple } from '@mariozechner/pi-ai';

// Works across all reasoning-capable providers
const model = getModel('anthropic', 'claude-sonnet-4-20250514');
// or getModel('openai', 'gpt-5-mini')
// or getModel('google', 'gemini-2.5-flash')
// or getModel('xai', 'grok-code-fast-1')

const s = streamSimple(model, context, {
  reasoning: 'medium', // Unified reasoning level
  temperature: 0.7,
  maxTokens: 2000
});

for await (const event of s) {
  // Same event types as stream()
}
Reasoning levels:
  • minimal - Very quick, basic reasoning
  • low - Quick reasoning
  • medium - Balanced reasoning (default)
  • high - Deep reasoning
  • xhigh - Maximum reasoning (OpenAI only, maps to high on other providers)
See Thinking/Reasoning for details.

Usage and Cost Tracking

const s = stream(model, context);

for await (const event of s) {
  if (event.type === 'text_delta') {
    process.stdout.write(event.delta);
  }
}

const message = await s.result();

// Token usage
console.log('Input tokens:', message.usage.input);
console.log('Output tokens:', message.usage.output);
console.log('Cache read:', message.usage.cacheRead);
console.log('Cache write:', message.usage.cacheWrite);
console.log('Total tokens:', message.usage.totalTokens);

// Cost breakdown
console.log('Input cost:', message.usage.cost.input);
console.log('Output cost:', message.usage.cost.output);
console.log('Cache read cost:', message.usage.cost.cacheRead);
console.log('Cache write cost:', message.usage.cost.cacheWrite);
console.log('Total cost:', message.usage.cost.total);

Next Steps

Tools

Learn about tool calling and validation

Thinking

Enable reasoning capabilities

Build docs developers (and LLMs) love