Skip to main content
Streams provide real-time, reactive data management in Motia. Unlike state, streams automatically broadcast changes to connected clients and trigger workflow steps, making them ideal for collaborative features, live updates, and reactive architectures.

Overview

Streams organize data in a three-level hierarchy:
stream: documents
  ├─ group: workspace-1
  │   ├─ item: doc-123 → { title: 'Proposal', content: '...' }
  │   ├─ item: doc-456 → { title: 'Meeting Notes', content: '...' }
  │   └─ item: doc-789 → { title: 'Roadmap', content: '...' }
  └─ group: workspace-2
      ├─ item: doc-abc → { title: 'Design', content: '...' }
      └─ item: doc-xyz → { title: 'Specs', content: '...' }
  • Stream: Top-level container (e.g., documents, users, tasks)
  • Group: Logical partition within a stream (e.g., workspace ID, user ID)
  • Item: Individual data record with a unique ID

Streams vs state

  • Real-time collaboration (shared documents, chat, presence)
  • Live dashboards and monitoring
  • Reactive workflows triggered by data changes
  • Client-server data sync
  • Multi-user applications
  • Simple key-value storage
  • Session management
  • Rate limiting
  • Feature flags
  • Cache
  • Data that doesn’t need real-time sync
Key differences:
FeatureStreamsState
Real-time syncYesNo
Client subscriptionsYesNo
Change eventsAutomaticManual via triggers
StructureStream/Group/ItemScope/Key
Use caseCollaborative, reactiveSimple storage

Defining streams

Streams are defined in your project configuration:
// motia/streams/documents.stream.ts
import { stream, jsonSchema } from 'motia'
import { z } from 'zod'

const documentSchema = z.object({
  id: z.string(),
  title: z.string(),
  content: z.string(),
  updatedAt: z.number(),
  authorId: z.string()
})

export default stream({
  name: 'documents',
  schema: jsonSchema(documentSchema),
  baseConfig: { storageType: 'default' },
  
  // Optional: authenticate connections
  onJoin: async (subscription, ctx, authContext) => {
    const { groupId } = subscription
    
    // Verify user has access to workspace
    const workspace = await ctx.state.get('workspaces', groupId)
    const userId = authContext?.userId
    
    if (!workspace?.members?.includes(userId)) {
      return { authorized: false }
    }
    
    return { 
      authorized: true,
      context: { userId, workspaceId: groupId }
    }
  },
  
  // Optional: cleanup on disconnect
  onLeave: async (subscription, ctx, authContext) => {
    ctx.logger.info('User left stream', { 
      userId: authContext?.userId,
      groupId: subscription.groupId 
    })
  }
})
Streams defined in your motia/streams/ directory are automatically registered and accessible via ctx.streams.

Accessing streams

Access streams in step handlers via ctx.streams:
import { step, http } from 'motia'

export default step({
  name: 'get-document',
  triggers: [http('GET', '/workspaces/:workspaceId/documents/:docId')]
}, async (input, ctx) => {
  const { workspaceId, docId } = input.request.pathParams
  
  // Access the documents stream
  const documents = ctx.streams.documents
  
  // Get the document
  const doc = await documents.get(workspaceId, docId)
  
  if (!doc) {
    return { status: 404, body: { error: 'Document not found' } }
  }
  
  return { status: 200, body: doc }
})

Stream operations

Get

Retrieve an item from a stream:
const doc = await ctx.streams.documents.get('workspace-1', 'doc-123')
// Returns: { id: 'doc-123', title: '...', content: '...', ... } | null

Set

Create or replace an item:
await ctx.streams.documents.set('workspace-1', 'doc-123', {
  id: 'doc-123',
  title: 'My Document',
  content: 'Content here...',
  updatedAt: Date.now(),
  authorId: 'user-42'
})
// Broadcasts 'create' or 'update' event to subscribers

Update

Apply partial updates:
import { step, http } from 'motia'

export default step({
  name: 'edit-document',
  triggers: [http('PATCH', '/workspaces/:workspaceId/documents/:docId')]
}, async (input, ctx) => {
  const { workspaceId, docId } = input.request.pathParams
  const { content } = input.request.body
  
  await ctx.streams.documents.update(workspaceId, docId, [
    { type: 'set', path: 'content', value: content },
    { type: 'set', path: 'updatedAt', value: Date.now() }
  ])
  // Broadcasts 'update' event with new values
  
  return { status: 200, body: { updated: true } }
})

Delete

Remove an item:
await ctx.streams.documents.delete('workspace-1', 'doc-123')
// Broadcasts 'delete' event to subscribers

List

Get all items in a group:
const docs = await ctx.streams.documents.list('workspace-1')
// Returns: Array<{ id: string, title: string, ... }>

List groups

Get all group IDs:
const workspaceIds = await ctx.streams.documents.listGroups()
// Returns: ['workspace-1', 'workspace-2', ...]

Stream triggers

React to stream changes with stream triggers:
import { step, stream } from 'motia'

export default step({
  name: 'on-document-change',
  triggers: [
    stream('documents', {
      groupId: 'workspace-123', // Optional: filter by group
      itemId: 'doc-456',        // Optional: filter by item
    })
  ],
  enqueues: ['document-indexed']
}, async (input, ctx) => {
  const { event, groupId, id } = input
  
  switch (event.type) {
    case 'create':
      ctx.logger.info('Document created', { id, data: event.data })
      await indexDocument(event.data)
      break
      
    case 'update':
      ctx.logger.info('Document updated', { id, data: event.data })
      await updateIndex(event.data)
      break
      
    case 'delete':
      ctx.logger.info('Document deleted', { id })
      await removeFromIndex(id)
      break
  }
  
  await ctx.enqueue({
    topic: 'document-indexed',
    data: { groupId, id, type: event.type }
  })
})

Stream trigger input

interface StreamTriggerInput<T> {
  type: 'stream'
  timestamp: number        // When the event occurred
  streamName: string      // Name of the stream
  groupId: string        // Group ID
  id: string            // Item ID
  event: StreamEvent<T> // The event data
}

type StreamEvent<T> =
  | { type: 'create'; data: T }
  | { type: 'update'; data: T }
  | { type: 'delete'; data: T }

Custom events

Send custom events to stream subscribers:
import { step, http } from 'motia'

export default step({
  name: 'notify-collaborators',
  triggers: [http('POST', '/documents/:docId/notify')]
}, async (input, ctx) => {
  const { docId } = input.request.pathParams
  const { message } = input.request.body
  
  await ctx.streams.documents.send(
    { groupId: 'workspace-1', id: docId },
    { 
      type: 'notification', 
      data: { message, timestamp: Date.now() } 
    }
  )
  // Subscribers receive this custom event
  
  return { status: 200, body: { sent: true } }
})

Client connections

Clients connect to streams via WebSocket:
// Client-side code
import { connectStream } from '@motia/client'

const stream = connectStream('documents', {
  groupId: 'workspace-1',
  auth: { token: userToken }
})

stream.on('create', (data) => {
  console.log('Document created:', data)
})

stream.on('update', (data) => {
  console.log('Document updated:', data)
})

stream.on('delete', (data) => {
  console.log('Document deleted:', data)
})

stream.on('notification', (data) => {
  console.log('Custom event:', data)
})

Use cases

Collaborative editor

import { step, http, stream } from 'motia'

export const editDocument = step({
  name: 'edit-document',
  triggers: [http('PATCH', '/docs/:id')]
}, async (input, ctx) => {
  const { id } = input.request.pathParams
  const { content, cursorPosition } = input.request.body
  
  await ctx.streams.documents.update('shared', id, [
    { type: 'set', path: 'content', value: content },
    { type: 'set', path: 'updatedAt', value: Date.now() }
  ])
  // All connected clients receive the update
  
  return { status: 200, body: { updated: true } }
})

Live dashboard

export const updateMetrics = step({
  name: 'update-metrics',
  triggers: [cron('*/30 * * * * *')] // Every 30 seconds
}, async (_, ctx) => {
  const metrics = await fetchCurrentMetrics()
  
  await ctx.streams.metrics.set('live', 'current', {
    id: 'current',
    ...metrics,
    timestamp: Date.now()
  })
  // Dashboard subscribers see live updates
})

Presence tracking

export const userPresence = step({
  name: 'track-presence',
  triggers: [stream('users')]
}, async (input, ctx) => {
  const { event, id } = input
  
  if (event.type === 'update' && event.data.status) {
    await ctx.streams.presence.set('global', id, {
      id,
      userId: id,
      status: event.data.status,
      lastSeen: Date.now()
    })
  }
})

Best practices

Use groups for isolation

Group related data together (e.g., by workspace, team, or user) for efficient filtering and access control.

Validate in onJoin

Always authenticate and authorize stream connections in the onJoin handler.

Keep data normalized

Store minimal data in streams. Use references to other streams or state for complex relationships.

Handle disconnections

Implement reconnection logic and conflict resolution in clients.

Use custom events sparingly

Custom events are powerful but can complicate client logic. Prefer standard create/update/delete when possible.

Next steps

Triggers

Learn about stream triggers

State management

Compare with state API

Context API

Explore FlowContext

Client SDK

Connect clients to streams

Build docs developers (and LLMs) love