Server-Sent Events (SSE)
Stream data from your API endpoints to clients in real-time:// steps/sse.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
export const config = {
name: 'SSE Example',
description: 'Streams random items back to the client',
flows: ['sse-example'],
triggers: [http('POST', '/sse')],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request, response }, { logger }) => {
logger.info('Starting SSE stream')
// Set SSE headers
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
'connection': 'keep-alive',
})
// Read request body
const chunks: string[] = []
for await (const chunk of request.requestBody.stream) {
chunks.push(Buffer.from(chunk).toString('utf-8'))
}
const body = chunks.join('').trim()
const params: Record<string, string> = {}
for (const [key, value] of new URLSearchParams(body)) {
params[key] = value
}
// Generate and stream items
const items = generateRandomItems(params)
for (const item of items) {
response.stream.write(`event: item\ndata: ${JSON.stringify(item)}\n\n`)
await sleep(300 + Math.random() * 700)
}
response.stream.write(`event: done\ndata: ${JSON.stringify({ total: items.length })}\n\n`)
response.close()
}
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
function generateRandomItems(params: Record<string, string>) {
const fields = Object.entries(params).map(([name, value]) => ({ name, value }))
const count = 5 + Math.floor(Math.random() * 6)
const adjectives = ['swift', 'lazy', 'bold', 'calm', 'fierce', 'gentle', 'sharp', 'wild']
const nouns = ['falcon', 'river', 'mountain', 'crystal', 'thunder', 'shadow', 'ember', 'frost']
return Array.from({ length: count }, (_, i) => ({
id: `item-${Date.now()}-${i}`,
label: `${adjectives[Math.floor(Math.random() * adjectives.length)]} ${nouns[Math.floor(Math.random() * nouns.length)]}`,
score: Math.round(Math.random() * 100),
source: fields.length > 0 ? fields[i % fields.length] : null,
}))
}
Streaming AI responses
Stream OpenAI responses in real-time:import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export const handler: Handlers<typeof config> = async ({ request, response }, { logger }) => {
const { message } = request.body
logger.info('Streaming AI response', { message })
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
'connection': 'keep-alive',
})
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: message }],
stream: true,
})
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
response.stream.write(`data: ${JSON.stringify({ content })}\n\n`)
}
}
response.stream.write('data: [DONE]\n\n')
response.close()
}
Motia Streams
Motia Streams provide real-time data synchronization across clients with automatic state management.Defining a stream
Create a stream configuration:// streams/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const todoSchema = z.object({
id: z.string(),
description: z.string(),
createdAt: z.string(),
dueDate: z.string().optional(),
completedAt: z.string().optional(),
})
export const config: StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'todo',
schema: todoSchema,
onJoin: async (subscription, context, authContext) => {
// Track who's watching
await context.streams.inbox.update('watching', subscription.groupId, [
{ type: 'increment', path: 'watching', by: 1 },
])
context.logger.info('Todo stream joined', { subscription, authContext })
return { unauthorized: false }
},
onLeave: async (subscription, context, authContext) => {
await context.streams.inbox.update('watching', subscription.groupId, [
{ type: 'decrement', path: 'watching', by: 1 },
])
context.logger.info('Todo stream left', { subscription, authContext })
},
}
export type Todo = z.infer<typeof todoSchema>
Writing to streams
Update stream data from your Steps:// steps/create-todo.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'
import type { Todo } from './todo.stream'
export const config = {
name: 'CreateTodo',
description: 'Create a new todo item',
flows: ['todo-app'],
triggers: [
http('POST', '/todo', {
bodySchema: z.object({
description: z.string(),
dueDate: z.string().optional()
}),
responseSchema: {
200: z.object({
id: z.string(),
description: z.string(),
createdAt: z.string(),
}),
},
}),
],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { logger, streams, state }) => {
logger.info('Creating new todo', { body: request.body })
const { description, dueDate } = request.body
const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`
if (!description) {
return { status: 400, body: { error: 'Description is required' } }
}
const newTodo: Todo = {
id: todoId,
description,
createdAt: new Date().toISOString(),
dueDate,
}
// Write to stream - all subscribed clients receive update
const todo = await streams.todo.set('inbox', todoId, newTodo)
// Also store in state
await state.set('todos', todoId, newTodo)
logger.info('Todo created successfully', { todoId })
return { status: 200, body: todo.new_value }
}
Consuming streams in React
Subscribe to streams from your frontend:import { useMotiaStream } from '@motia/stream-client-react'
function TodoList() {
const { items, isConnected } = useMotiaStream({
url: 'ws://localhost:8787',
streamName: 'todo',
groupId: 'inbox',
})
return (
<div>
<h2>Todos {isConnected ? '🟢' : '🔴'}</h2>
<ul>
{Array.from(items.values()).map(todo => (
<li key={todo.id}>
{todo.description}
{todo.completedAt && ' ✓'}
</li>
))}
</ul>
</div>
)
}
Stream updates
Update existing stream items:export const handler: Handlers<typeof config> = async ({ request }, { streams, logger }) => {
const { todoId } = request.params
await streams.todo.update('inbox', todoId, [
{ type: 'set', path: 'completedAt', value: new Date().toISOString() },
])
logger.info('Todo marked as complete', { todoId })
return { status: 200, body: { ok: true } }
}
Stream triggers
React to stream changes with stream triggers:export const config = {
name: 'OnTodoCreated',
triggers: [
{
type: 'stream',
streamName: 'todo',
event: 'set',
},
],
enqueues: ['todo.analytics'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
const { groupId, itemId, new_value } = input
logger.info('New todo created', { groupId, itemId, todo: new_value })
await enqueue({
topic: 'todo.analytics',
data: {
event: 'todo_created',
todoId: itemId,
timestamp: new Date().toISOString(),
},
})
}
Real-time updates with state triggers
Use state triggers for real-time notifications:export const config = {
name: 'OnOrderUpdate',
triggers: [
{
type: 'state',
namespace: 'orders',
event: 'set',
},
],
enqueues: ['notification.send'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
const { key, new_value, old_value } = input
if (old_value?.status !== new_value?.status) {
logger.info('Order status changed', {
orderId: key,
oldStatus: old_value?.status,
newStatus: new_value?.status,
})
await enqueue({
topic: 'notification.send',
data: {
userId: new_value.userId,
message: `Your order ${key} is now ${new_value.status}`,
},
})
}
}
Stream authentication
Protect streams with authentication:export const config: StreamConfig = {
name: 'private-messages',
schema: messageSchema,
onJoin: async (subscription, context, authContext) => {
const userId = authContext?.userId
if (!userId) {
return { unauthorized: true, reason: 'Not authenticated' }
}
// Check if user has access to this group
const hasAccess = await checkUserAccess(userId, subscription.groupId)
if (!hasAccess) {
return { unauthorized: true, reason: 'Access denied' }
}
context.logger.info('User joined stream', { userId, groupId: subscription.groupId })
return { unauthorized: false }
},
}
Related concepts
Streams
Learn about Motia Streams
Stream triggers
React to stream changes
State triggers
React to state changes
AI agents
Build AI agents with streaming