Skip to main content

Overview

Streams provide real-time data synchronization and event broadcasting. They support CRUD operations, subscriptions, and can trigger workflow steps when data changes. Access streams through the context object:
async (input, ctx) => {
  await ctx.streams.myStream.set(groupId, itemId, data)
  const item = await ctx.streams.myStream.get(groupId, itemId)
}

Stream class

class Stream<TData> {
  constructor(readonly config: StreamConfig)
  
  get(groupId: string, itemId: string): Promise<TData | null>
  set(groupId: string, itemId: string, data: TData): Promise<StreamSetResult<TData> | null>
  update(groupId: string, itemId: string, ops: UpdateOp[]): Promise<StreamSetResult<TData> | null>
  delete(groupId: string, itemId: string): Promise<void>
  list(groupId: string): Promise<TData[]>
  listGroups(): Promise<string[]>
  send<T>(channel: StateStreamEventChannel, event: StateStreamEvent<T>): Promise<void>
}

Configuration

Define streams in separate .stream.ts files:
// parallel-merge.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const parallelMergeSchema = z.object({
  startedAt: z.number(),
  totalSteps: z.number(),
  completedSteps: z.number(),
})

export const config: StreamConfig = {
  name: 'parallelMerge',
  schema: parallelMergeSchema,
  baseConfig: { storageType: 'default' },
  onJoin: async (subscription, ctx, authContext) => {
    // Called when a client subscribes
    return { allowed: true }
  },
  onLeave: async (subscription, ctx, authContext) => {
    // Called when a client unsubscribes
  },
}

export type ParallelMergeData = z.infer<typeof parallelMergeSchema>

StreamConfig

name
string
required
Unique stream identifier
schema
StepSchemaInput
required
Zod schema or JSON Schema defining the data structure
baseConfig
{ storageType: 'default' }
required
Storage configuration (currently only ‘default’ is supported)
onJoin
(subscription, ctx, authContext?) => Promise<StreamJoinResult>
Callback when a client subscribes. Return { allowed: true } to permit subscription
onLeave
(subscription, ctx, authContext?) => Promise<void>
Callback when a client unsubscribes

Methods

get()

Retrieve an item from the stream.
groupId
string
required
Group identifier for organizing items
itemId
string
required
Item identifier within the group
return
Promise<TData | null>
The item data or null if not found
async (input, ctx) => {
  const status = await ctx.streams.parallelMerge.get('merge-groups', 'job-123')
  
  if (!status) {
    return { status: 404, body: { error: 'Not found' } }
  }
  
  return { status: 200, body: status }
}

set()

Create or update an item in the stream.
groupId
string
required
Group identifier
itemId
string
required
Item identifier
data
TData
required
Item data matching the stream schema
return
Promise<StreamSetResult<TData> | null>
Result containing the stored data, version, and timestamp
type StreamSetResult<T> = {
  value: T
  version: number
  timestamp: number
}

Example

async (input, ctx) => {
  const result = await ctx.streams.parallelMerge.set(
    'merge-groups',
    'job-123',
    {
      startedAt: Date.now(),
      totalSteps: 10,
      completedSteps: 0,
    }
  )
  
  ctx.logger.info('Stream item created', {
    version: result?.version,
    timestamp: result?.timestamp,
  })
}

update()

Update an item using JSON Patch operations.
groupId
string
required
Group identifier
itemId
string
required
Item identifier
ops
UpdateOp[]
required
Array of JSON Patch operations
return
Promise<StreamSetResult<TData> | null>
Result containing the updated data and metadata
type UpdateOp = 
  | { op: 'add', path: string, value: any }
  | { op: 'remove', path: string }
  | { op: 'replace', path: string, value: any }
  | { op: 'move', from: string, path: string }
  | { op: 'copy', from: string, path: string }
  | { op: 'test', path: string, value: any }

Example

async (input, ctx) => {
  const result = await ctx.streams.parallelMerge.update(
    'merge-groups',
    'job-123',
    [
      { op: 'replace', path: '/completedSteps', value: 5 },
    ]
  )
  
  return { status: 200, body: result?.value }
}

delete()

Remove an item from the stream.
groupId
string
required
Group identifier
itemId
string
required
Item identifier
return
Promise<void>
Resolves when the item is deleted
async (input, ctx) => {
  await ctx.streams.parallelMerge.delete('merge-groups', 'job-123')
  
  ctx.logger.info('Stream item deleted', { jobId: 'job-123' })
  
  return { status: 200, body: { deleted: true } }
}

list()

Retrieve all items in a group.
groupId
string
required
Group identifier
return
Promise<TData[]>
Array of all items in the group
async (input, ctx) => {
  const jobs = await ctx.streams.parallelMerge.list('merge-groups')
  
  const activeJobs = jobs.filter(job => job.completedSteps < job.totalSteps)
  
  return {
    status: 200,
    body: { jobs, activeCount: activeJobs.length },
  }
}

listGroups()

Retrieve all group IDs in the stream.
return
Promise<string[]>
Array of all group identifiers
async (input, ctx) => {
  const groups = await ctx.streams.parallelMerge.listGroups()
  
  ctx.logger.info('Stream groups', { count: groups.length })
  
  return { status: 200, body: { groups } }
}

send()

Send a custom event to stream subscribers.
channel
StateStreamEventChannel
required
Channel specifying the target group and optional item
type StateStreamEventChannel = {
  groupId: string
  id?: string  // Optional item ID
}
event
StateStreamEvent<T>
required
Event with custom type and data
type StateStreamEvent<T> = {
  type: string
  data: T
}
return
Promise<void>
Resolves when the event is sent

Example

async (input, ctx) => {
  await ctx.streams.parallelMerge.send(
    { groupId: 'merge-groups', id: 'job-123' },
    {
      type: 'progress-update',
      data: { percent: 50, message: 'Halfway there' },
    }
  )
  
  ctx.logger.info('Custom event sent to subscribers')
}

Stream triggers

Trigger workflow steps when stream data changes:
import { step, stream } from 'motia'
import type { StreamTriggerInput } from 'motia'
import type { ParallelMergeData } from './parallel-merge.stream'

export const { config, handler } = step(
  {
    name: 'OnMergeComplete',
    description: 'Trigger when all parallel steps complete',
    triggers: [
      stream('parallelMerge', {
        groupId: 'merge-groups',
        condition: (input: StreamTriggerInput<ParallelMergeData>) => {
          // Only trigger when all steps are complete
          return (
            input.event.type === 'update' &&
            input.event.data.completedSteps === input.event.data.totalSteps
          )
        },
      }),
    ],
  },
  async (input, ctx) => {
    const { event, groupId, id } = input
    const { data } = event
    
    const duration = Date.now() - data.startedAt
    
    ctx.logger.info('All parallel steps completed', {
      jobId: id,
      totalSteps: data.totalSteps,
      duration,
    })
  }
)

StreamTriggerInput

type StreamTriggerInput<T> = {
  type: 'stream'
  timestamp: number
  streamName: string
  groupId: string
  id: string
  event: StreamEvent<T>
}

type StreamEvent<T> =
  | { type: 'create', data: T }
  | { type: 'update', data: T }
  | { type: 'delete', data: T }

Usage patterns

Progress tracking

Track progress of parallel operations:
// Initialize progress
export const { config, handler } = step(
  {
    name: 'StartParallelJobs',
    triggers: [http('POST', '/jobs/parallel')],
  },
  async (input, ctx) => {
    const jobId = generateId()
    const tasks = input.body.tasks
    
    // Initialize stream item
    await ctx.streams.parallelMerge.set('merge-groups', jobId, {
      startedAt: Date.now(),
      totalSteps: tasks.length,
      completedSteps: 0,
    })
    
    // Start tasks
    for (const task of tasks) {
      await ctx.enqueue({ topic: 'process-task', data: { jobId, task } })
    }
    
    return { status: 200, body: { jobId } }
  }
)

// Update progress
export const processTask = step(
  {
    name: 'ProcessTask',
    triggers: [queue('process-task')],
  },
  async (input, ctx) => {
    const { jobId, task } = input
    
    await processTask(task)
    
    // Increment completed count
    const current = await ctx.streams.parallelMerge.get('merge-groups', jobId)
    if (current) {
      await ctx.streams.parallelMerge.update('merge-groups', jobId, [
        { op: 'replace', path: '/completedSteps', value: current.completedSteps + 1 },
      ])
    }
  }
)

Real-time notifications

Broadcast custom events to subscribers:
async (input, ctx) => {
  // Update data
  await ctx.streams.notifications.set('user-123', 'latest', {
    message: 'Your order has shipped',
    timestamp: Date.now(),
  })
  
  // Send custom event
  await ctx.streams.notifications.send(
    { groupId: 'user-123' },
    {
      type: 'notification',
      data: {
        title: 'Order Shipped',
        body: 'Your order is on the way',
        priority: 'high',
      },
    }
  )
}

Conditional triggers

React to specific state changes:
import { step, stream } from 'motia'
import type { StreamTriggerInput } from 'motia'

export const { config, handler } = step(
  {
    name: 'OnHighPriorityAlert',
    triggers: [
      stream('alerts', {
        condition: (input: StreamTriggerInput<Alert>) => {
          return (
            input.event.type === 'create' &&
            input.event.data.priority === 'high'
          )
        },
      }),
    ],
    enqueues: ['send-notification'],
  },
  async (input, ctx) => {
    const alert = input.event.data
    
    ctx.logger.warn('High priority alert', { alert })
    
    await ctx.enqueue({
      topic: 'send-notification',
      data: {
        recipient: alert.assignedTo,
        message: alert.message,
        urgency: 'immediate',
      },
    })
  }
)

Best practices

  1. Use typed streams: Define TypeScript types for stream data:
    export type MyStreamData = z.infer<typeof myStreamSchema>
    const item = await ctx.streams.myStream.get<MyStreamData>(groupId, itemId)
    
  2. Organize with groups: Use groups to partition data logically:
    await ctx.streams.progress.set('user-123', 'task-456', data)
    await ctx.streams.progress.set('user-789', 'task-456', data)
    
  3. Prefer update() for modifications: Use JSON Patch for atomic updates:
    // Good: atomic increment
    await ctx.streams.counters.update(groupId, itemId, [
      { op: 'replace', path: '/count', value: current.count + 1 },
    ])
    
    // Avoid: race condition
    const current = await ctx.streams.counters.get(groupId, itemId)
    await ctx.streams.counters.set(groupId, itemId, { count: current.count + 1 })
    
  4. Use conditions for selective triggers: Filter events in trigger conditions:
    stream('myStream', {
      condition: (input) => input.event.data.shouldTrigger === true,
    })
    
  5. Clean up deleted items: Remove obsolete stream data:
    await ctx.streams.temporary.delete(groupId, itemId)
    

Build docs developers (and LLMs) love