Streams provide real-time, reactive data management in Motia. Unlike state, streams automatically broadcast changes to connected clients and trigger workflow steps, making them ideal for collaborative features, live updates, and reactive architectures.
Overview
Streams organize data in a three-level hierarchy:
stream: documents
├─ group: workspace-1
│ ├─ item: doc-123 → { title: 'Proposal', content: '...' }
│ ├─ item: doc-456 → { title: 'Meeting Notes', content: '...' }
│ └─ item: doc-789 → { title: 'Roadmap', content: '...' }
└─ group: workspace-2
├─ item: doc-abc → { title: 'Design', content: '...' }
└─ item: doc-xyz → { title: 'Specs', content: '...' }
Stream : Top-level container (e.g., documents, users, tasks)
Group : Logical partition within a stream (e.g., workspace ID, user ID)
Item : Individual data record with a unique ID
Streams vs state
Real-time collaboration (shared documents, chat, presence)
Live dashboards and monitoring
Reactive workflows triggered by data changes
Client-server data sync
Multi-user applications
Simple key-value storage
Session management
Rate limiting
Feature flags
Cache
Data that doesn’t need real-time sync
Key differences:
Feature Streams State Real-time sync Yes No Client subscriptions Yes No Change events Automatic Manual via triggers Structure Stream/Group/Item Scope/Key Use case Collaborative, reactive Simple storage
Defining streams
Streams are defined in your project configuration:
// motia/streams/documents.stream.ts
import { stream , jsonSchema } from 'motia'
import { z } from 'zod'
const documentSchema = z . object ({
id: z . string (),
title: z . string (),
content: z . string (),
updatedAt: z . number (),
authorId: z . string ()
})
export default stream ({
name: 'documents' ,
schema: jsonSchema ( documentSchema ) ,
baseConfig: { storageType: 'default' } ,
// Optional: authenticate connections
onJoin : async ( subscription , ctx , authContext ) => {
const { groupId } = subscription
// Verify user has access to workspace
const workspace = await ctx . state . get ( 'workspaces' , groupId )
const userId = authContext ?. userId
if ( ! workspace ?. members ?. includes ( userId )) {
return { authorized: false }
}
return {
authorized: true ,
context: { userId , workspaceId: groupId }
}
} ,
// Optional: cleanup on disconnect
onLeave : async ( subscription , ctx , authContext ) => {
ctx . logger . info ( 'User left stream' , {
userId: authContext ?. userId ,
groupId: subscription . groupId
})
}
})
# motia/streams/documents_stream.py
from motia import stream
from motia.types_stream import StreamConfig
async def on_join ( subscription , ctx , auth_context ):
group_id = subscription.group_id
# Verify user has access to workspace
workspace = await ctx.state.get( 'workspaces' , group_id)
user_id = auth_context.get( 'userId' ) if auth_context else None
if not workspace or user_id not in workspace.get( 'members' , []):
return { 'authorized' : False }
return {
'authorized' : True ,
'context' : { 'userId' : user_id, 'workspaceId' : group_id}
}
async def on_leave ( subscription , ctx , auth_context ):
ctx.logger.info( 'User left stream' , extra = {
'userId' : auth_context.get( 'userId' ) if auth_context else None ,
'groupId' : subscription.group_id
})
default = StreamConfig(
name = 'documents' ,
schema = {
'type' : 'object' ,
'properties' : {
'id' : { 'type' : 'string' },
'title' : { 'type' : 'string' },
'content' : { 'type' : 'string' },
'updatedAt' : { 'type' : 'number' },
'authorId' : { 'type' : 'string' }
}
},
base_config = { 'storageType' : 'default' },
on_join = on_join,
on_leave = on_leave
)
Streams defined in your motia/streams/ directory are automatically registered and accessible via ctx.streams.
Accessing streams
Access streams in step handlers via ctx.streams:
import { step , http } from 'motia'
export default step ({
name: 'get-document' ,
triggers: [ http ( 'GET' , '/workspaces/:workspaceId/documents/:docId' )]
}, async ( input , ctx ) => {
const { workspaceId , docId } = input . request . pathParams
// Access the documents stream
const documents = ctx . streams . documents
// Get the document
const doc = await documents . get ( workspaceId , docId )
if ( ! doc ) {
return { status: 404 , body: { error: 'Document not found' } }
}
return { status: 200 , body: doc }
} )
from motia import step, http
default = step({
'name' : 'get-document' ,
'triggers' : [http( 'GET' , '/workspaces/ {workspaceId} /documents/ {docId} ' )]
}, async def handler( input , ctx):
workspace_id = input .request.path_params[ 'workspaceId' ]
doc_id = input .request.path_params[ 'docId' ]
# Access the documents stream
documents = ctx.streams[ 'documents' ]
# Get the document
doc = await documents.get(workspace_id, doc_id)
if not doc:
return { 'status' : 404 , 'body' : { 'error' : 'Document not found' }}
return { 'status' : 200 , 'body' : doc}
)
Stream operations
Get
Retrieve an item from a stream:
const doc = await ctx . streams . documents . get ( 'workspace-1' , 'doc-123' )
// Returns: { id: 'doc-123', title: '...', content: '...', ... } | null
doc = await ctx.streams[ 'documents' ].get( 'workspace-1' , 'doc-123' )
# Returns: {'id': 'doc-123', 'title': '...', 'content': '...', ...} or None
Set
Create or replace an item:
await ctx . streams . documents . set ( 'workspace-1' , 'doc-123' , {
id: 'doc-123' ,
title: 'My Document' ,
content: 'Content here...' ,
updatedAt: Date . now (),
authorId: 'user-42'
})
// Broadcasts 'create' or 'update' event to subscribers
from time import time
await ctx.streams[ 'documents' ].set( 'workspace-1' , 'doc-123' , {
'id' : 'doc-123' ,
'title' : 'My Document' ,
'content' : 'Content here...' ,
'updatedAt' : int (time() * 1000 ),
'authorId' : 'user-42'
})
# Broadcasts 'create' or 'update' event to subscribers
Update
Apply partial updates:
import { step , http } from 'motia'
export default step ({
name: 'edit-document' ,
triggers: [ http ( 'PATCH' , '/workspaces/:workspaceId/documents/:docId' )]
}, async ( input , ctx ) => {
const { workspaceId , docId } = input . request . pathParams
const { content } = input . request . body
await ctx . streams . documents . update ( workspaceId , docId , [
{ type: 'set' , path: 'content' , value: content },
{ type: 'set' , path: 'updatedAt' , value: Date . now () }
])
// Broadcasts 'update' event with new values
return { status: 200 , body: { updated: true } }
} )
from motia import step, http
from time import time
default = step({
'name' : 'edit-document' ,
'triggers' : [http( 'PATCH' , '/workspaces/ {workspaceId} /documents/ {docId} ' )]
}, async def handler( input , ctx):
workspace_id = input .request.path_params[ 'workspaceId' ]
doc_id = input .request.path_params[ 'docId' ]
content = input .request.body[ 'content' ]
await ctx.streams[ 'documents' ].update(workspace_id, doc_id, [
{ 'type' : 'set' , 'path' : 'content' , 'value' : content},
{ 'type' : 'set' , 'path' : 'updatedAt' , 'value' : int (time() * 1000 )}
])
# Broadcasts 'update' event with new values
return { 'status' : 200 , 'body' : { 'updated' : True }}
)
Delete
Remove an item:
await ctx . streams . documents . delete ( 'workspace-1' , 'doc-123' )
// Broadcasts 'delete' event to subscribers
await ctx.streams[ 'documents' ].delete( 'workspace-1' , 'doc-123' )
# Broadcasts 'delete' event to subscribers
List
Get all items in a group:
const docs = await ctx . streams . documents . list ( 'workspace-1' )
// Returns: Array<{ id: string, title: string, ... }>
docs = await ctx.streams[ 'documents' ].list( 'workspace-1' )
# Returns: list of document objects
List groups
Get all group IDs:
const workspaceIds = await ctx . streams . documents . listGroups ()
// Returns: ['workspace-1', 'workspace-2', ...]
workspace_ids = await ctx.streams[ 'documents' ].list_groups()
# Returns: ['workspace-1', 'workspace-2', ...]
Stream triggers
React to stream changes with stream triggers:
import { step , stream } from 'motia'
export default step ({
name: 'on-document-change' ,
triggers: [
stream ( 'documents' , {
groupId: 'workspace-123' , // Optional: filter by group
itemId: 'doc-456' , // Optional: filter by item
})
] ,
enqueues: [ 'document-indexed' ]
}, async ( input , ctx ) => {
const { event , groupId , id } = input
switch ( event . type ) {
case 'create' :
ctx . logger . info ( 'Document created' , { id , data: event . data })
await indexDocument ( event . data )
break
case 'update' :
ctx . logger . info ( 'Document updated' , { id , data: event . data })
await updateIndex ( event . data )
break
case 'delete' :
ctx . logger . info ( 'Document deleted' , { id })
await removeFromIndex ( id )
break
}
await ctx . enqueue ({
topic: 'document-indexed' ,
data: { groupId , id , type: event . type }
})
} )
from motia import step, stream
default = step({
'name' : 'on-document-change' ,
'triggers' : [
stream( 'documents' ,
group_id = 'workspace-123' , # Optional: filter by group
item_id = 'doc-456' ) # Optional: filter by item
],
'enqueues' : [ 'document-indexed' ]
}, async def handler( input , ctx):
event = input [ 'event' ]
group_id = input [ 'groupId' ]
item_id = input [ 'id' ]
if event[ 'type' ] == 'create' :
ctx.logger.info( 'Document created' ,
extra = { 'id' : item_id, 'data' : event[ 'data' ]})
await index_document(event[ 'data' ])
elif event[ 'type' ] == 'update' :
ctx.logger.info( 'Document updated' ,
extra = { 'id' : item_id, 'data' : event[ 'data' ]})
await update_index(event[ 'data' ])
elif event[ 'type' ] == 'delete' :
ctx.logger.info( 'Document deleted' , extra = { 'id' : item_id})
await remove_from_index(item_id)
await ctx.enqueue({
'topic' : 'document-indexed' ,
'data' : { 'groupId' : group_id, 'id' : item_id, 'type' : event[ 'type' ]}
})
)
interface StreamTriggerInput < T > {
type : 'stream'
timestamp : number // When the event occurred
streamName : string // Name of the stream
groupId : string // Group ID
id : string // Item ID
event : StreamEvent < T > // The event data
}
type StreamEvent < T > =
| { type : 'create' ; data : T }
| { type : 'update' ; data : T }
| { type : 'delete' ; data : T }
class StreamTriggerInput :
type : Literal[ 'stream' ]
timestamp: int # When the event occurred
stream_name: str # Name of the stream
group_id: str # Group ID
id : str # Item ID
event: StreamEvent # The event data
class StreamEvent :
type : Literal[ 'create' , 'update' , 'delete' ]
data: Any
Custom events
Send custom events to stream subscribers:
import { step , http } from 'motia'
export default step ({
name: 'notify-collaborators' ,
triggers: [ http ( 'POST' , '/documents/:docId/notify' )]
}, async ( input , ctx ) => {
const { docId } = input . request . pathParams
const { message } = input . request . body
await ctx . streams . documents . send (
{ groupId: 'workspace-1' , id: docId },
{
type: 'notification' ,
data: { message , timestamp: Date . now () }
}
)
// Subscribers receive this custom event
return { status: 200 , body: { sent: true } }
} )
from motia import step, http
from time import time
default = step({
'name' : 'notify-collaborators' ,
'triggers' : [http( 'POST' , '/documents/ {docId} /notify' )]
}, async def handler( input , ctx):
doc_id = input .request.path_params[ 'docId' ]
message = input .request.body[ 'message' ]
await ctx.streams[ 'documents' ].send(
{ 'groupId' : 'workspace-1' , 'id' : doc_id},
{
'type' : 'notification' ,
'data' : { 'message' : message, 'timestamp' : int (time() * 1000 )}
}
)
# Subscribers receive this custom event
return { 'status' : 200 , 'body' : { 'sent' : True }}
)
Client connections
Clients connect to streams via WebSocket:
// Client-side code
import { connectStream } from '@motia/client'
const stream = connectStream ( 'documents' , {
groupId: 'workspace-1' ,
auth: { token: userToken }
})
stream . on ( 'create' , ( data ) => {
console . log ( 'Document created:' , data )
})
stream . on ( 'update' , ( data ) => {
console . log ( 'Document updated:' , data )
})
stream . on ( 'delete' , ( data ) => {
console . log ( 'Document deleted:' , data )
})
stream . on ( 'notification' , ( data ) => {
console . log ( 'Custom event:' , data )
})
Use cases
Collaborative editor
import { step , http , stream } from 'motia'
export const editDocument = step ({
name: 'edit-document' ,
triggers: [ http ( 'PATCH' , '/docs/:id' )]
}, async ( input , ctx ) => {
const { id } = input . request . pathParams
const { content , cursorPosition } = input . request . body
await ctx . streams . documents . update ( 'shared' , id , [
{ type: 'set' , path: 'content' , value: content },
{ type: 'set' , path: 'updatedAt' , value: Date . now () }
])
// All connected clients receive the update
return { status: 200 , body: { updated: true } }
})
Live dashboard
export const updateMetrics = step ({
name: 'update-metrics' ,
triggers: [ cron ( '*/30 * * * * *' )] // Every 30 seconds
}, async ( _ , ctx ) => {
const metrics = await fetchCurrentMetrics ()
await ctx . streams . metrics . set ( 'live' , 'current' , {
id: 'current' ,
... metrics ,
timestamp: Date . now ()
})
// Dashboard subscribers see live updates
})
Presence tracking
export const userPresence = step ({
name: 'track-presence' ,
triggers: [ stream ( 'users' )]
}, async ( input , ctx ) => {
const { event , id } = input
if ( event . type === 'update' && event . data . status ) {
await ctx . streams . presence . set ( 'global' , id , {
id ,
userId: id ,
status: event . data . status ,
lastSeen: Date . now ()
})
}
})
Best practices
Use groups for isolation Group related data together (e.g., by workspace, team, or user) for efficient filtering and access control.
Validate in onJoin Always authenticate and authorize stream connections in the onJoin handler.
Keep data normalized Store minimal data in streams. Use references to other streams or state for complex relationships.
Handle disconnections Implement reconnection logic and conflict resolution in clients.
Use custom events sparingly Custom events are powerful but can complicate client logic. Prefer standard create/update/delete when possible.
Next steps
Triggers Learn about stream triggers
State management Compare with state API
Context API Explore FlowContext
Client SDK Connect clients to streams