Overview
Streams provide real-time data synchronization and event broadcasting. They support CRUD operations, subscriptions, and can trigger workflow steps when data changes.
Access streams through the context object:
async (input, ctx) => {
await ctx.streams.myStream.set(groupId, itemId, data)
const item = await ctx.streams.myStream.get(groupId, itemId)
}
Stream class
class Stream<TData> {
constructor(readonly config: StreamConfig)
get(groupId: string, itemId: string): Promise<TData | null>
set(groupId: string, itemId: string, data: TData): Promise<StreamSetResult<TData> | null>
update(groupId: string, itemId: string, ops: UpdateOp[]): Promise<StreamSetResult<TData> | null>
delete(groupId: string, itemId: string): Promise<void>
list(groupId: string): Promise<TData[]>
listGroups(): Promise<string[]>
send<T>(channel: StateStreamEventChannel, event: StateStreamEvent<T>): Promise<void>
}
Configuration
Define streams in separate .stream.ts files:
// parallel-merge.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const parallelMergeSchema = z.object({
startedAt: z.number(),
totalSteps: z.number(),
completedSteps: z.number(),
})
export const config: StreamConfig = {
name: 'parallelMerge',
schema: parallelMergeSchema,
baseConfig: { storageType: 'default' },
onJoin: async (subscription, ctx, authContext) => {
// Called when a client subscribes
return { allowed: true }
},
onLeave: async (subscription, ctx, authContext) => {
// Called when a client unsubscribes
},
}
export type ParallelMergeData = z.infer<typeof parallelMergeSchema>
StreamConfig
Zod schema or JSON Schema defining the data structure
baseConfig
{ storageType: 'default' }
required
Storage configuration (currently only ‘default’ is supported)
onJoin
(subscription, ctx, authContext?) => Promise<StreamJoinResult>
Callback when a client subscribes. Return { allowed: true } to permit subscription
onLeave
(subscription, ctx, authContext?) => Promise<void>
Callback when a client unsubscribes
Methods
get()
Retrieve an item from the stream.
Group identifier for organizing items
Item identifier within the group
The item data or null if not found
async (input, ctx) => {
const status = await ctx.streams.parallelMerge.get('merge-groups', 'job-123')
if (!status) {
return { status: 404, body: { error: 'Not found' } }
}
return { status: 200, body: status }
}
set()
Create or update an item in the stream.
Item data matching the stream schema
return
Promise<StreamSetResult<TData> | null>
Result containing the stored data, version, and timestamp
type StreamSetResult<T> = {
value: T
version: number
timestamp: number
}
Example
async (input, ctx) => {
const result = await ctx.streams.parallelMerge.set(
'merge-groups',
'job-123',
{
startedAt: Date.now(),
totalSteps: 10,
completedSteps: 0,
}
)
ctx.logger.info('Stream item created', {
version: result?.version,
timestamp: result?.timestamp,
})
}
update()
Update an item using JSON Patch operations.
Array of JSON Patch operations
return
Promise<StreamSetResult<TData> | null>
Result containing the updated data and metadata
type UpdateOp =
| { op: 'add', path: string, value: any }
| { op: 'remove', path: string }
| { op: 'replace', path: string, value: any }
| { op: 'move', from: string, path: string }
| { op: 'copy', from: string, path: string }
| { op: 'test', path: string, value: any }
Example
async (input, ctx) => {
const result = await ctx.streams.parallelMerge.update(
'merge-groups',
'job-123',
[
{ op: 'replace', path: '/completedSteps', value: 5 },
]
)
return { status: 200, body: result?.value }
}
delete()
Remove an item from the stream.
Resolves when the item is deleted
async (input, ctx) => {
await ctx.streams.parallelMerge.delete('merge-groups', 'job-123')
ctx.logger.info('Stream item deleted', { jobId: 'job-123' })
return { status: 200, body: { deleted: true } }
}
list()
Retrieve all items in a group.
Array of all items in the group
async (input, ctx) => {
const jobs = await ctx.streams.parallelMerge.list('merge-groups')
const activeJobs = jobs.filter(job => job.completedSteps < job.totalSteps)
return {
status: 200,
body: { jobs, activeCount: activeJobs.length },
}
}
listGroups()
Retrieve all group IDs in the stream.
Array of all group identifiers
async (input, ctx) => {
const groups = await ctx.streams.parallelMerge.listGroups()
ctx.logger.info('Stream groups', { count: groups.length })
return { status: 200, body: { groups } }
}
send()
Send a custom event to stream subscribers.
channel
StateStreamEventChannel
required
Channel specifying the target group and optional itemtype StateStreamEventChannel = {
groupId: string
id?: string // Optional item ID
}
event
StateStreamEvent<T>
required
Event with custom type and datatype StateStreamEvent<T> = {
type: string
data: T
}
Resolves when the event is sent
Example
async (input, ctx) => {
await ctx.streams.parallelMerge.send(
{ groupId: 'merge-groups', id: 'job-123' },
{
type: 'progress-update',
data: { percent: 50, message: 'Halfway there' },
}
)
ctx.logger.info('Custom event sent to subscribers')
}
Stream triggers
Trigger workflow steps when stream data changes:
import { step, stream } from 'motia'
import type { StreamTriggerInput } from 'motia'
import type { ParallelMergeData } from './parallel-merge.stream'
export const { config, handler } = step(
{
name: 'OnMergeComplete',
description: 'Trigger when all parallel steps complete',
triggers: [
stream('parallelMerge', {
groupId: 'merge-groups',
condition: (input: StreamTriggerInput<ParallelMergeData>) => {
// Only trigger when all steps are complete
return (
input.event.type === 'update' &&
input.event.data.completedSteps === input.event.data.totalSteps
)
},
}),
],
},
async (input, ctx) => {
const { event, groupId, id } = input
const { data } = event
const duration = Date.now() - data.startedAt
ctx.logger.info('All parallel steps completed', {
jobId: id,
totalSteps: data.totalSteps,
duration,
})
}
)
type StreamTriggerInput<T> = {
type: 'stream'
timestamp: number
streamName: string
groupId: string
id: string
event: StreamEvent<T>
}
type StreamEvent<T> =
| { type: 'create', data: T }
| { type: 'update', data: T }
| { type: 'delete', data: T }
Usage patterns
Progress tracking
Track progress of parallel operations:
// Initialize progress
export const { config, handler } = step(
{
name: 'StartParallelJobs',
triggers: [http('POST', '/jobs/parallel')],
},
async (input, ctx) => {
const jobId = generateId()
const tasks = input.body.tasks
// Initialize stream item
await ctx.streams.parallelMerge.set('merge-groups', jobId, {
startedAt: Date.now(),
totalSteps: tasks.length,
completedSteps: 0,
})
// Start tasks
for (const task of tasks) {
await ctx.enqueue({ topic: 'process-task', data: { jobId, task } })
}
return { status: 200, body: { jobId } }
}
)
// Update progress
export const processTask = step(
{
name: 'ProcessTask',
triggers: [queue('process-task')],
},
async (input, ctx) => {
const { jobId, task } = input
await processTask(task)
// Increment completed count
const current = await ctx.streams.parallelMerge.get('merge-groups', jobId)
if (current) {
await ctx.streams.parallelMerge.update('merge-groups', jobId, [
{ op: 'replace', path: '/completedSteps', value: current.completedSteps + 1 },
])
}
}
)
Real-time notifications
Broadcast custom events to subscribers:
async (input, ctx) => {
// Update data
await ctx.streams.notifications.set('user-123', 'latest', {
message: 'Your order has shipped',
timestamp: Date.now(),
})
// Send custom event
await ctx.streams.notifications.send(
{ groupId: 'user-123' },
{
type: 'notification',
data: {
title: 'Order Shipped',
body: 'Your order is on the way',
priority: 'high',
},
}
)
}
Conditional triggers
React to specific state changes:
import { step, stream } from 'motia'
import type { StreamTriggerInput } from 'motia'
export const { config, handler } = step(
{
name: 'OnHighPriorityAlert',
triggers: [
stream('alerts', {
condition: (input: StreamTriggerInput<Alert>) => {
return (
input.event.type === 'create' &&
input.event.data.priority === 'high'
)
},
}),
],
enqueues: ['send-notification'],
},
async (input, ctx) => {
const alert = input.event.data
ctx.logger.warn('High priority alert', { alert })
await ctx.enqueue({
topic: 'send-notification',
data: {
recipient: alert.assignedTo,
message: alert.message,
urgency: 'immediate',
},
})
}
)
Best practices
-
Use typed streams: Define TypeScript types for stream data:
export type MyStreamData = z.infer<typeof myStreamSchema>
const item = await ctx.streams.myStream.get<MyStreamData>(groupId, itemId)
-
Organize with groups: Use groups to partition data logically:
await ctx.streams.progress.set('user-123', 'task-456', data)
await ctx.streams.progress.set('user-789', 'task-456', data)
-
Prefer update() for modifications: Use JSON Patch for atomic updates:
// Good: atomic increment
await ctx.streams.counters.update(groupId, itemId, [
{ op: 'replace', path: '/count', value: current.count + 1 },
])
// Avoid: race condition
const current = await ctx.streams.counters.get(groupId, itemId)
await ctx.streams.counters.set(groupId, itemId, { count: current.count + 1 })
-
Use conditions for selective triggers: Filter events in trigger conditions:
stream('myStream', {
condition: (input) => input.event.data.shouldTrigger === true,
})
-
Clean up deleted items: Remove obsolete stream data:
await ctx.streams.temporary.delete(groupId, itemId)