Stream triggers allow you to react to create, update, and delete events on stream data, enabling real-time data processing and event-driven workflows.
Basic usage
import { step, stream } from 'motia'
export const config = step({
name: 'on-event',
triggers: [stream('events')],
})
export const handler = async (input, ctx) => {
const { streamName, groupId, id, event } = input
ctx.logger.info('Stream event received', {
stream: streamName,
group: groupId,
id,
type: event.type,
})
}
from motia import step, stream
config = step(
name='on-event',
triggers=[stream('events')],
)
async def handler(input, ctx):
stream_name = input['streamName']
group_id = input['groupId']
item_id = input['id']
event = input['event']
ctx.logger.info('Stream event received', {
'stream': stream_name,
'group': group_id,
'id': item_id,
'type': event['type'],
})
Stream triggers receive an input object with the following structure:
Always 'stream' for stream triggers
Unix timestamp (milliseconds) when the event occurred
Name of the stream where the event occurred
Group identifier within the stream
Unique identifier for the item
Event object containing:
type: 'create', 'update', or 'delete'
data: The item data (for create and update events)
Event types
Create events
Triggered when a new item is added:
export const handler = async (input, ctx) => {
if (input.event.type === 'create') {
const newItem = input.event.data
ctx.logger.info('Item created', {
id: input.id,
data: newItem,
})
}
}
async def handler(input, ctx):
event = input['event']
if event['type'] == 'create':
new_item = event['data']
ctx.logger.info('Item created', {
'id': input['id'],
'data': new_item,
})
Update events
Triggered when an existing item is modified:
export const handler = async (input, ctx) => {
if (input.event.type === 'update') {
const updatedItem = input.event.data
ctx.logger.info('Item updated', {
id: input.id,
data: updatedItem,
})
}
}
async def handler(input, ctx):
event = input['event']
if event['type'] == 'update':
updated_item = event['data']
ctx.logger.info('Item updated', {
'id': input['id'],
'data': updated_item,
})
Delete events
Triggered when an item is removed:
export const handler = async (input, ctx) => {
if (input.event.type === 'delete') {
const deletedItem = input.event.data
ctx.logger.info('Item deleted', {
id: input.id,
data: deletedItem,
})
}
}
async def handler(input, ctx):
event = input['event']
if event['type'] == 'delete':
deleted_item = event['data']
ctx.logger.info('Item deleted', {
'id': input['id'],
'data': deleted_item,
})
Filtering by group
Listen to events from a specific group:
import { step, stream } from 'motia'
export const config = step({
name: 'on-user-events',
triggers: [
stream('events', {
groupId: 'users',
}),
],
})
export const handler = async (input, ctx) => {
// Only receives events from 'users' group
ctx.logger.info('User event', {
userId: input.id,
type: input.event.type,
})
}
from motia import step, stream
config = step(
name='on-user-events',
triggers=[
stream('events', group_id='users'),
],
)
async def handler(input, ctx):
# Only receives events from 'users' group
ctx.logger.info('User event', {
'userId': input['id'],
'type': input['event']['type'],
})
Filtering by item
Listen to events for a specific item:
import { step, stream } from 'motia'
export const config = step({
name: 'on-specific-user',
triggers: [
stream('events', {
groupId: 'users',
itemId: 'user-123',
}),
],
})
export const handler = async (input, ctx) => {
// Only receives events for user-123
ctx.logger.info('User 123 event', {
type: input.event.type,
})
}
from motia import step, stream
config = step(
name='on-specific-user',
triggers=[
stream('events', group_id='users', item_id='user-123'),
],
)
async def handler(input, ctx):
# Only receives events for user-123
ctx.logger.info('User 123 event', {
'type': input['event']['type'],
})
Conditional triggers
Filter events with custom conditions:
import { step, stream } from 'motia'
export const config = step({
name: 'on-high-value-order',
triggers: [
stream('orders', {
condition: (input, ctx) => {
const { event } = input
return event.type === 'create' && event.data.total > 1000
},
}),
],
})
export const handler = async (input, ctx) => {
ctx.logger.info('High value order created', {
orderId: input.id,
total: input.event.data.total,
})
}
from motia import step, stream
def high_value_condition(input, ctx):
event = input['event']
return event['type'] == 'create' and event['data']['total'] > 1000
config = step(
name='on-high-value-order',
triggers=[
stream('orders', condition=high_value_condition),
],
)
async def handler(input, ctx):
ctx.logger.info('High value order created', {
'orderId': input['id'],
'total': input['event']['data']['total'],
})
Accessing streams
Interact with streams from handlers:
export const handler = async (input, ctx) => {
const stream = ctx.streams.events
// Get item
const item = await stream.get(input.groupId, input.id)
// Update item
await stream.update(input.groupId, input.id, [
{ type: 'set', path: 'processed', value: true },
])
// List items in group
const items = await stream.list(input.groupId)
ctx.logger.info('Stream accessed', {
itemCount: items.length,
})
}
async def handler(input, ctx):
stream = ctx.streams['events']
# Get item
item = await stream.get(input['groupId'], input['id'])
# Update item
await stream.update(input['groupId'], input['id'], [
{'type': 'set', 'path': 'processed', 'value': True},
])
# List items in group
items = await stream.list(input['groupId'])
ctx.logger.info('Stream accessed', {
'itemCount': len(items),
})
Configuration options
Name of the stream to listen to
Optional group identifier to filter events. If specified, only events from this group trigger the workflow
Optional item identifier to filter events. Requires groupId. If specified, only events for this specific item trigger the workflow
Optional function (input, ctx) => boolean to filter events:
input.type - Always 'stream'
input.streamName - Stream name
input.groupId - Group identifier
input.id - Item identifier
input.event.type - Event type: 'create', 'update', or 'delete'
input.event.data - Item data
Use cases
Real-time notifications
Notify users of changes:
import { step, stream } from 'motia'
export const config = step({
name: 'notify-followers',
triggers: [
stream('posts', {
condition: (input) => input.event.type === 'create',
}),
],
enqueues: ['send-notification'],
})
export const handler = async (input, ctx) => {
const post = input.event.data
const authorId = post.authorId
// Get followers
const followersStream = ctx.streams.followers
const followers = await followersStream.list(authorId)
// Queue notifications
for (const follower of followers) {
await ctx.enqueue({
topic: 'send-notification',
data: {
userId: follower.id,
message: `${post.author} posted: ${post.title}`,
},
})
}
}
Data synchronization
Sync data to external systems:
import { step, stream } from 'motia'
export const config = step({
name: 'sync-to-external',
triggers: [stream('products')],
})
export const handler = async (input, ctx) => {
const { event, id } = input
switch (event.type) {
case 'create':
await externalAPI.createProduct(event.data)
break
case 'update':
await externalAPI.updateProduct(id, event.data)
break
case 'delete':
await externalAPI.deleteProduct(id)
break
}
ctx.logger.info('Synced to external system', { id })
}
Analytics tracking
Track user activity:
import { step, stream } from 'motia'
export const config = step({
name: 'track-activity',
triggers: [stream('user-actions')],
})
export const handler = async (input, ctx) => {
const { event, groupId, id, timestamp } = input
if (event.type === 'create') {
const action = event.data
// Track to analytics
await analytics.track({
userId: groupId,
event: action.type,
properties: action.properties,
timestamp,
})
// Update user stats
await ctx.state.update('user-stats', groupId, [
{ type: 'set', path: 'lastAction', value: timestamp },
{ type: 'increment', path: 'actionCount', value: 1 },
])
}
}
Cache invalidation
Invalidate caches on data changes:
import { step, stream } from 'motia'
export const config = step({
name: 'invalidate-cache',
triggers: [stream('catalog')],
})
export const handler = async (input, ctx) => {
const { groupId, id, event } = input
if (event.type === 'update' || event.type === 'delete') {
// Invalidate related caches
await cache.delete(`product:${id}`)
await cache.delete(`category:${groupId}`)
ctx.logger.info('Cache invalidated', {
productId: id,
categoryId: groupId,
})
}
}