Skip to main content
oRPC supports server-sent events (SSE) and streaming through async iterators. The same type safety that applies to regular procedures extends to streaming responses.

eventIterator()

Use the eventIterator() helper (re-exported from @orpc/contract) to define a typed async generator:
import { eventIterator, os } from '@orpc/server'
import * as z from 'zod'

const subscribe = os
  .input(z.object({ topic: z.string() }))
  .output(
    eventIterator(
      z.object({ message: z.string(), timestamp: z.number() }),
    ),
  )
  .handler(async function* ({ input }) {
    while (true) {
      const event = await waitForNextEvent(input.topic)
      yield { message: event.text, timestamp: Date.now() }
    }
  })
On the client, iterating over the result gives you fully typed events:
for await (const event of await orpc.subscribe({ topic: 'news' })) {
  console.log(event.message, event.timestamp)
}

EventPublisher

For pub/sub patterns, use EventPublisher to broadcast events to multiple concurrent subscribers:
import { EventPublisher, os } from '@orpc/server'
import * as z from 'zod'

// A shared publisher — typically stored in app state or a Durable Object
const publisher = new EventPublisher<{
  planetUpdated: { id: number; name: string }
}>()

// Subscribe procedure: streams events to the client
const subscribe = os
  .output(eventIterator(z.object({ id: z.number(), name: z.string() })))
  .handler(async function* ({ signal }) {
    yield* publisher.subscribe('planetUpdated', { signal })
  })

// Publish procedure: pushes an event to all subscribers
const publishUpdate = os
  .input(z.object({ id: z.number(), name: z.string() }))
  .handler(async ({ input }) => {
    publisher.publish('planetUpdated', input)
  })

Event metadata

You can attach SSE-specific metadata (id, event, retry) to individual events using withEventMeta:
import { withEventMeta, os } from '@orpc/server'

const subscribe = os
  .output(eventIterator(z.string()))
  .handler(async function* () {
    yield withEventMeta('hello', {
      id: '1',
      event: 'greeting',
      retry: 5000,
    })
  })

Stream utilities

@orpc/server re-exports several stream conversion utilities:
import {
  eventIteratorToStream,       // AsyncIterator → ReadableStream
  eventIteratorToUnproxiedDataStream, // for raw data streams
  streamToEventIterator,       // ReadableStream → AsyncIterator
} from '@orpc/server'
Streaming procedures are not supported inside batch requests. Call them directly.

Build docs developers (and LLMs) love