Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Durable Streams provides two modes for real-time streaming: long-polling and Server-Sent Events (SSE). Both modes support resumability and work seamlessly with CDN caching.

Live Modes Overview

Durable Streams supports three live modes:
// Automatically chooses SSE for JSON, long-poll for binary
const res = await stream.stream({ live: true })
Setting live: true (the default) automatically selects the best mode: SSE for JSON streams, long-poll for binary streams.

Long-Polling Mode

Long-polling waits for new data on each request, then returns and reconnects:
import { DurableStream } from "@durable-streams/client"

const stream = new DurableStream({
  url: "https://streams.example.com/v1/stream/notifications",
})

const res = await stream.stream({
  offset: "-1",
  live: "long-poll",
})

res.subscribeJson(async (batch) => {
  console.log(`Received ${batch.items.length} items`)
  
  for (const item of batch.items) {
    await handleNotification(item)
  }
  
  // Offset is automatically used for next poll
})

How Long-Polling Works

1

Client requests with offset

GET /v1/stream/notifications?offset=100_2048&live=long-poll
2

Server waits for data

If no new data exists, the server holds the connection open (default: 30 seconds)
3

Response with data or timeout

  • New data arrives: Server responds immediately with data and Stream-Next-Offset
  • Timeout: Server responds with Stream-Up-To-Date header (no body)
4

Client reconnects automatically

The client automatically makes the next request using the latest offset
Long-polling works through CDNs and reverse proxies, making it ideal for production deployments.

Server-Sent Events (SSE)

SSE provides a persistent connection with automatic reconnection:
const res = await stream.stream({
  offset: "-1",
  live: "sse",
})

res.subscribeJson(async (batch) => {
  for (const item of batch.items) {
    console.log("Live update:", item)
  }
})

SSE with Binary Data

For binary streams, SSE automatically base64-encodes data:
const stream = await DurableStream.create({
  url: "https://streams.example.com/v1/stream/images",
  contentType: "application/octet-stream",
})

const res = await stream.stream({ live: "sse" })

res.subscribeBytes(async (chunk) => {
  // Automatically decoded from base64
  console.log("Binary chunk:", chunk.data) // Uint8Array
})
The server sets Stream-SSE-Data-Encoding: base64 for binary content types. Clients automatically detect and decode the data.

SSE Resilience

Configure automatic reconnection behavior:
const res = await stream.stream({
  live: "sse",
  sseResilience: {
    maxReconnectAttempts: 10,
    reconnectDelayMs: 1000,
    maxReconnectDelayMs: 30000,
  },
})

Subscriber Pattern

The subscriber pattern handles backpressure automatically:
const res = await stream.stream({
  live: "long-poll",
})

res.subscribeJson(async (batch) => {
  // This async function creates natural backpressure
  // Next batch won't arrive until this completes
  
  for (const item of batch.items) {
    await slowDatabaseWrite(item)
  }
  
  console.log(`Processed batch at offset: ${batch.offset}`)
})
If your subscriber function throws an error, the subscription stops. Implement error handling inside the subscriber to continue receiving updates.

Error Handling in Subscribers

res.subscribeJson(async (batch) => {
  try {
    for (const item of batch.items) {
      await processItem(item)
    }
  } catch (err) {
    console.error("Processing failed:", err)
    
    // Option 1: Skip and continue
    await logFailedBatch(batch, err)
    
    // Option 2: Retry with backoff
    await retryWithBackoff(() => processBatch(batch))
    
    // Option 3: Store for later reprocessing
    await storeFailedBatch(batch)
  }
})

ReadableStream Integration

For advanced use cases, access the underlying ReadableStream:
const res = await stream.stream({ live: false })
const readable = res.jsonStream()

// Use with TransformStream
const transformed = readable.pipeThrough(
  new TransformStream({
    transform(item, controller) {
      // Transform each item
      controller.enqueue({ ...item, processed: true })
    },
  })
)

// Pipe to destination
await transformed.pipeTo(destinationWritable)

Multi-viewer Synchronization

Share streams across multiple viewers in real-time:
import { DurableStream } from "@durable-streams/client"

class SharedStreamViewer {
  private viewers = new Set<(items: any[]) => void>()
  private currentOffset = "-1"

  constructor(private stream: DurableStream) {}

  async start() {
    const res = await this.stream.stream({
      offset: this.currentOffset,
      live: "sse",
    })

    res.subscribeJson(async (batch) => {
      this.currentOffset = batch.offset
      
      // Notify all viewers
      for (const viewer of this.viewers) {
        viewer(batch.items)
      }
    })
  }

  addViewer(callback: (items: any[]) => void) {
    this.viewers.add(callback)
    return () => this.viewers.delete(callback)
  }
}

// Usage
const sharedViewer = new SharedStreamViewer(stream)
await sharedViewer.start()

// Multiple components subscribe
const unsubscribe1 = sharedViewer.addViewer((items) => {
  console.log("Viewer 1:", items)
})

const unsubscribe2 = sharedViewer.addViewer((items) => {
  console.log("Viewer 2:", items)
})

Pause and Resume

Control subscription lifecycle with abort signals:
class PausableStreamReader {
  private abortController: AbortController | null = null
  private currentOffset = "-1"

  constructor(private stream: DurableStream) {}

  async start() {
    this.abortController = new AbortController()

    const res = await this.stream.stream({
      offset: this.currentOffset,
      live: "long-poll",
      signal: this.abortController.signal,
    })

    res.subscribeJson(async (batch) => {
      // Save offset for resumption
      this.currentOffset = batch.offset
      
      for (const item of batch.items) {
        await processItem(item)
      }
    })
  }

  pause() {
    this.abortController?.abort()
    this.abortController = null
  }

  async resume() {
    if (!this.abortController) {
      await this.start() // Resumes from saved offset
    }
  }
}

Dynamic Headers and Params

Refresh auth tokens automatically for long-running subscriptions:
import { DurableStream } from "@durable-streams/client"

const stream = new DurableStream({
  url: "https://streams.example.com/v1/stream/events",
  headers: {
    // Static header
    "X-Client-Version": "1.0.0",
    
    // Dynamic header - called for each request
    Authorization: async () => {
      const token = await getAuthToken()
      return `Bearer ${token}`
    },
  },
})

const res = await stream.stream({ live: "long-poll" })

// Auth token is automatically refreshed on each poll
res.subscribeJson(async (batch) => {
  // Process items
})
Header and param functions are called per-request, not per-session. In long-poll mode, this happens on every poll cycle, allowing fresh tokens to be fetched automatically.

Batching and Throughput

For high-throughput scenarios, batch processing improves efficiency:
const res = await stream.stream({ live: "long-poll" })

let buffer: any[] = []
const BATCH_SIZE = 100
const FLUSH_INTERVAL_MS = 1000

let flushTimer = setInterval(async () => {
  if (buffer.length > 0) {
    await processBatch(buffer)
    buffer = []
  }
}, FLUSH_INTERVAL_MS)

res.subscribeJson(async (batch) => {
  buffer.push(...batch.items)
  
  if (buffer.length >= BATCH_SIZE) {
    clearInterval(flushTimer)
    await processBatch(buffer)
    buffer = []
    flushTimer = setInterval(/* ... */, FLUSH_INTERVAL_MS)
  }
})

Stream Closed Detection

Detect when a stream is closed by the writer:
const res = await stream.stream({ live: "long-poll" })

res.subscribeJson(async (batch) => {
  for (const item of batch.items) {
    await processItem(item)
  }
  
  if (batch.streamClosed) {
    console.log("Stream closed by writer at offset:", batch.offset)
    // Clean up resources
    await cleanup()
  }
})
When a stream is closed, readers receive the Stream-Closed: true header in responses. The streamClosed flag in batches indicates the end of the stream.

Testing Live Subscriptions

Test real-time behavior with controlled data arrival:
import { DurableStream } from "@durable-streams/client"

describe("Live subscriptions", () => {
  it("receives live updates", async () => {
    const stream = await DurableStream.create({
      url: "https://streams.example.com/v1/stream/test",
      contentType: "application/json",
    })

    const receivedItems: any[] = []

    const res = await stream.stream({ live: "long-poll" })
    
    res.subscribeJson(async (batch) => {
      receivedItems.push(...batch.items)
    })

    // Wait for subscription to start
    await new Promise((resolve) => setTimeout(resolve, 100))

    // Write test data
    await stream.append(JSON.stringify({ test: "data1" }))
    await stream.append(JSON.stringify({ test: "data2" }))

    // Wait for delivery
    await new Promise((resolve) => setTimeout(resolve, 500))

    expect(receivedItems).toHaveLength(2)
    expect(receivedItems[0]).toEqual({ test: "data1" })
  })
})

Next Steps

Error Handling

Implement robust error handling for production

Production Deployment

Deploy Durable Streams to production with CDN support

Build docs developers (and LLMs) love