Skip to main content

Overview

Motia provides built-in support for real-time data streaming using Server-Sent Events (SSE) and WebSocket-like streams. This enables you to push updates to clients as they happen.

Server-Sent Events (SSE)

SSE is perfect for one-way server-to-client streaming, like live updates, progress notifications, or streaming AI responses.
1

Create an SSE endpoint

steps/sse-example.step.ts
import { type Handlers, http, type StepConfig } from 'motia'

export const config = {
  name: 'SSE Example',
  description: 'Stream data to clients using Server-Sent Events',
  flows: ['streaming'],
  triggers: [http('POST', '/stream')],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request, response },
  { logger }
) => {
  logger.info('SSE stream requested')

  // Set up SSE headers
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  // Parse request data
  const chunks: string[] = []
  for await (const chunk of request.requestBody.stream) {
    chunks.push(Buffer.from(chunk).toString('utf-8'))
  }

  const body = chunks.join('').trim()
  const params = new URLSearchParams(body)
  const data = Object.fromEntries(params)

  logger.info('Processing SSE request', { data })

  // Generate and stream items
  const items = generateItems(data)

  for (const item of items) {
    // Send SSE event
    response.stream.write(
      `event: item\ndata: ${JSON.stringify(item)}\n\n`
    )
    
    // Simulate processing delay
    await sleep(300 + Math.random() * 700)
  }

  // Send completion event
  response.stream.write(
    `event: done\ndata: ${JSON.stringify({ total: items.length })}\n\n`
  )
  
  response.close()
}

function sleep(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

function generateItems(data: Record<string, string>) {
  const count = 5 + Math.floor(Math.random() * 6)
  return Array.from({ length: count }, (_, i) => ({
    id: `item-${Date.now()}-${i}`,
    data: data,
    timestamp: new Date().toISOString(),
  }))
}
2

Connect from the client

client.js
const eventSource = new EventSource('http://localhost:3000/stream')

eventSource.addEventListener('item', (event) => {
  const item = JSON.parse(event.data)
  console.log('Received item:', item)
  // Update UI with new item
})

eventSource.addEventListener('done', (event) => {
  const summary = JSON.parse(event.data)
  console.log('Stream complete:', summary)
  eventSource.close()
})

eventSource.onerror = (error) => {
  console.error('SSE error:', error)
  eventSource.close()
}

SSE with Python

You can also build SSE endpoints using Motia’s Python SDK:
steps/sse_step.py
import asyncio
import json
import random
from typing import Any

from motia import MotiaHttpArgs, FlowContext, http

config = {
    "name": "SSE Example",
    "description": "Stream data using Server-Sent Events",
    "flows": ["streaming"],
    "triggers": [http("POST", "/stream")],
    "enqueues": [],
}

async def handler(args: MotiaHttpArgs[Any], ctx: FlowContext[Any]) -> None:
    request = args.request
    response = args.response

    ctx.logger.info("SSE stream requested")

    # Set up SSE headers
    await response.status(200)
    await response.headers({
        "content-type": "text/event-stream",
        "cache-control": "no-cache",
        "connection": "keep-alive",
    })

    # Read request body
    raw_chunks: list[str] = []
    async for chunk in request.request_body.stream:
        if isinstance(chunk, bytes):
            raw_chunks.append(chunk.decode("utf-8", errors="replace"))
        else:
            raw_chunks.append(str(chunk))

    # Generate items
    items = _generate_items()

    # Stream items
    for item in items:
        data = json.dumps(item)
        response.writer.stream.write(
            f"event: item\ndata: {data}\n\n".encode("utf-8")
        )
        await asyncio.sleep(0.3 + random.random() * 0.7)

    # Send completion
    done_data = json.dumps({"total": len(items)})
    response.writer.stream.write(
        f"event: done\ndata: {done_data}\n\n".encode("utf-8")
    )
    response.close()

def _generate_items() -> list[dict[str, Any]]:
    return [
        {
            "id": f"item-{i}",
            "value": random.randint(1, 100),
        }
        for i in range(10)
    ]

Streaming AI responses

SSE is ideal for streaming LLM responses to clients:
steps/ai-chat.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'AI Chat Stream',
  description: 'Stream AI responses to clients',
  flows: ['ai'],
  triggers: [
    http('POST', '/chat', {
      bodySchema: z.object({
        message: z.string(),
        conversationId: z.string().optional(),
      }),
    }),
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request, response },
  { logger }
) => {
  const { message, conversationId } = request.body

  logger.info('AI chat requested', { message, conversationId })

  // Set up SSE
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  try {
    // Call your LLM (example with OpenAI)
    const stream = await openai.chat.completions.create({
      model: 'gpt-4',
      messages: [{ role: 'user', content: message }],
      stream: true,
    })

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || ''
      
      if (content) {
        response.stream.write(
          `event: token\ndata: ${JSON.stringify({ content })}\n\n`
        )
      }
    }

    response.stream.write(
      `event: done\ndata: ${JSON.stringify({ finished: true })}\n\n`
    )
  } catch (error) {
    logger.error('AI streaming failed', { error })
    response.stream.write(
      `event: error\ndata: ${JSON.stringify({ error: 'Stream failed' })}\n\n`
    )
  }

  response.close()
}

Real-time state streams

Motia provides built-in state streams for real-time data synchronization:
steps/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  description: z.string(),
  createdAt: z.string(),
  completedAt: z.string().optional(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'todo',
  schema: todoSchema,

  onJoin: async (subscription, context, authContext) => {
    // Track active subscribers
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'increment', path: 'watching', by: 1 },
    ])

    context.logger.info('Client joined stream', {
      subscription,
      authContext,
    })
    
    return { unauthorized: false }
  },

  onLeave: async (subscription, context, authContext) => {
    // Update subscriber count
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'decrement', path: 'watching', by: 1 },
    ])

    context.logger.info('Client left stream', { subscription })
  },
}

export type Todo = z.infer<typeof todoSchema>
Use the stream in your steps:
steps/create-todo.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'
import type { Todo } from './todo.stream'

export const config = {
  name: 'CreateTodo',
  flows: ['todo-app'],
  triggers: [
    http('POST', '/todo', {
      bodySchema: z.object({ 
        description: z.string() 
      }),
    }),
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, streams, state }
) => {
  const { description } = request.body
  const todoId = `todo-${Date.now()}`

  const newTodo: Todo = {
    id: todoId,
    description,
    createdAt: new Date().toISOString(),
  }

  // Update stream - all connected clients receive this update
  await streams.todo.set('inbox', todoId, newTodo)

  // Also persist to state
  await state.set('todos', todoId, newTodo)

  logger.info('Todo created and streamed', { todoId })

  return { status: 200, body: newTodo }
}

Progress updates

Stream progress updates for long-running operations:
export const handler: Handlers<typeof config> = async (
  { response },
  { logger }
) => {
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  const steps = [
    { name: 'Initializing', duration: 1000 },
    { name: 'Processing data', duration: 3000 },
    { name: 'Generating report', duration: 2000 },
    { name: 'Finalizing', duration: 1000 },
  ]

  for (let i = 0; i < steps.length; i++) {
    const step = steps[i]
    const progress = ((i + 1) / steps.length) * 100

    response.stream.write(
      `event: progress\ndata: ${JSON.stringify({
        step: step.name,
        progress: Math.round(progress),
        current: i + 1,
        total: steps.length,
      })}\n\n`
    )

    await sleep(step.duration)
  }

  response.stream.write(
    `event: complete\ndata: ${JSON.stringify({ success: true })}\n\n`
  )
  response.close()
}

Best practices

Set proper headers

Always set content-type: text/event-stream and cache-control: no-cache for SSE.

Handle client disconnects

Check connection status periodically. Clean up resources when clients disconnect.

Send heartbeats

Send periodic ping events to keep connections alive and detect disconnects.

Limit concurrent streams

Monitor and limit the number of concurrent streaming connections per client.

Next steps

AI Integration

Build AI-powered workflows with streaming responses

Multi-language

Mix TypeScript and Python in one project

Build docs developers (and LLMs) love