Skip to main content
Motia provides two powerful ways to build real-time features: Server-Sent Events (SSE) for streaming HTTP responses, and Streams for real-time data synchronization across clients.

Server-Sent Events (SSE)

Stream data from your API endpoints to clients in real-time:
// steps/sse.step.ts
import { type Handlers, http, type StepConfig } from 'motia'

export const config = {
  name: 'SSE Example',
  description: 'Streams random items back to the client',
  flows: ['sse-example'],
  triggers: [http('POST', '/sse')],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request, response }, { logger }) => {
  logger.info('Starting SSE stream')
  
  // Set SSE headers
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    'connection': 'keep-alive',
  })
  
  // Read request body
  const chunks: string[] = []
  for await (const chunk of request.requestBody.stream) {
    chunks.push(Buffer.from(chunk).toString('utf-8'))
  }
  
  const body = chunks.join('').trim()
  const params: Record<string, string> = {}
  for (const [key, value] of new URLSearchParams(body)) {
    params[key] = value
  }
  
  // Generate and stream items
  const items = generateRandomItems(params)
  
  for (const item of items) {
    response.stream.write(`event: item\ndata: ${JSON.stringify(item)}\n\n`)
    await sleep(300 + Math.random() * 700)
  }
  
  response.stream.write(`event: done\ndata: ${JSON.stringify({ total: items.length })}\n\n`)
  response.close()
}

function sleep(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms))
}

function generateRandomItems(params: Record<string, string>) {
  const fields = Object.entries(params).map(([name, value]) => ({ name, value }))
  const count = 5 + Math.floor(Math.random() * 6)
  const adjectives = ['swift', 'lazy', 'bold', 'calm', 'fierce', 'gentle', 'sharp', 'wild']
  const nouns = ['falcon', 'river', 'mountain', 'crystal', 'thunder', 'shadow', 'ember', 'frost']

  return Array.from({ length: count }, (_, i) => ({
    id: `item-${Date.now()}-${i}`,
    label: `${adjectives[Math.floor(Math.random() * adjectives.length)]} ${nouns[Math.floor(Math.random() * nouns.length)]}`,
    score: Math.round(Math.random() * 100),
    source: fields.length > 0 ? fields[i % fields.length] : null,
  }))
}

Streaming AI responses

Stream OpenAI responses in real-time:
import OpenAI from 'openai'

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })

export const handler: Handlers<typeof config> = async ({ request, response }, { logger }) => {
  const { message } = request.body
  
  logger.info('Streaming AI response', { message })
  
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    'connection': 'keep-alive',
  })
  
  const stream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [{ role: 'user', content: message }],
    stream: true,
  })
  
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || ''
    if (content) {
      response.stream.write(`data: ${JSON.stringify({ content })}\n\n`)
    }
  }
  
  response.stream.write('data: [DONE]\n\n')
  response.close()
}

Motia Streams

Motia Streams provide real-time data synchronization across clients with automatic state management.

Defining a stream

Create a stream configuration:
// streams/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  description: z.string(),
  createdAt: z.string(),
  dueDate: z.string().optional(),
  completedAt: z.string().optional(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'todo',
  schema: todoSchema,

  onJoin: async (subscription, context, authContext) => {
    // Track who's watching
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'increment', path: 'watching', by: 1 },
    ])

    context.logger.info('Todo stream joined', { subscription, authContext })
    return { unauthorized: false }
  },

  onLeave: async (subscription, context, authContext) => {
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'decrement', path: 'watching', by: 1 },
    ])

    context.logger.info('Todo stream left', { subscription, authContext })
  },
}

export type Todo = z.infer<typeof todoSchema>

Writing to streams

Update stream data from your Steps:
// steps/create-todo.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'
import type { Todo } from './todo.stream'

export const config = {
  name: 'CreateTodo',
  description: 'Create a new todo item',
  flows: ['todo-app'],
  triggers: [
    http('POST', '/todo', {
      bodySchema: z.object({ 
        description: z.string(), 
        dueDate: z.string().optional() 
      }),
      responseSchema: {
        200: z.object({
          id: z.string(),
          description: z.string(),
          createdAt: z.string(),
        }),
      },
    }),
  ],
  enqueues: [],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { logger, streams, state }) => {
  logger.info('Creating new todo', { body: request.body })

  const { description, dueDate } = request.body
  const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`

  if (!description) {
    return { status: 400, body: { error: 'Description is required' } }
  }

  const newTodo: Todo = {
    id: todoId,
    description,
    createdAt: new Date().toISOString(),
    dueDate,
  }

  // Write to stream - all subscribed clients receive update
  const todo = await streams.todo.set('inbox', todoId, newTodo)

  // Also store in state
  await state.set('todos', todoId, newTodo)

  logger.info('Todo created successfully', { todoId })

  return { status: 200, body: todo.new_value }
}

Consuming streams in React

Subscribe to streams from your frontend:
import { useMotiaStream } from '@motia/stream-client-react'

function TodoList() {
  const { items, isConnected } = useMotiaStream({
    url: 'ws://localhost:8787',
    streamName: 'todo',
    groupId: 'inbox',
  })
  
  return (
    <div>
      <h2>Todos {isConnected ? '🟢' : '🔴'}</h2>
      <ul>
        {Array.from(items.values()).map(todo => (
          <li key={todo.id}>
            {todo.description}
            {todo.completedAt && ' ✓'}
          </li>
        ))}
      </ul>
    </div>
  )
}

Stream updates

Update existing stream items:
export const handler: Handlers<typeof config> = async ({ request }, { streams, logger }) => {
  const { todoId } = request.params
  
  await streams.todo.update('inbox', todoId, [
    { type: 'set', path: 'completedAt', value: new Date().toISOString() },
  ])
  
  logger.info('Todo marked as complete', { todoId })
  
  return { status: 200, body: { ok: true } }
}

Stream triggers

React to stream changes with stream triggers:
export const config = {
  name: 'OnTodoCreated',
  triggers: [
    {
      type: 'stream',
      streamName: 'todo',
      event: 'set',
    },
  ],
  enqueues: ['todo.analytics'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
  const { groupId, itemId, new_value } = input
  
  logger.info('New todo created', { groupId, itemId, todo: new_value })
  
  await enqueue({
    topic: 'todo.analytics',
    data: {
      event: 'todo_created',
      todoId: itemId,
      timestamp: new Date().toISOString(),
    },
  })
}

Real-time updates with state triggers

Use state triggers for real-time notifications:
export const config = {
  name: 'OnOrderUpdate',
  triggers: [
    {
      type: 'state',
      namespace: 'orders',
      event: 'set',
    },
  ],
  enqueues: ['notification.send'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
  const { key, new_value, old_value } = input
  
  if (old_value?.status !== new_value?.status) {
    logger.info('Order status changed', {
      orderId: key,
      oldStatus: old_value?.status,
      newStatus: new_value?.status,
    })
    
    await enqueue({
      topic: 'notification.send',
      data: {
        userId: new_value.userId,
        message: `Your order ${key} is now ${new_value.status}`,
      },
    })
  }
}

Stream authentication

Protect streams with authentication:
export const config: StreamConfig = {
  name: 'private-messages',
  schema: messageSchema,
  
  onJoin: async (subscription, context, authContext) => {
    const userId = authContext?.userId
    
    if (!userId) {
      return { unauthorized: true, reason: 'Not authenticated' }
    }
    
    // Check if user has access to this group
    const hasAccess = await checkUserAccess(userId, subscription.groupId)
    
    if (!hasAccess) {
      return { unauthorized: true, reason: 'Access denied' }
    }
    
    context.logger.info('User joined stream', { userId, groupId: subscription.groupId })
    return { unauthorized: false }
  },
}

Streams

Learn about Motia Streams

Stream triggers

React to stream changes

State triggers

React to state changes

AI agents

Build AI agents with streaming

Build docs developers (and LLMs) love