Skip to main content
Stream triggers allow you to react to create, update, and delete events on stream data, enabling real-time data processing and event-driven workflows.

Basic usage

import { step, stream } from 'motia'

export const config = step({
  name: 'on-event',
  triggers: [stream('events')],
})

export const handler = async (input, ctx) => {
  const { streamName, groupId, id, event } = input
  
  ctx.logger.info('Stream event received', {
    stream: streamName,
    group: groupId,
    id,
    type: event.type,
  })
}

Input structure

Stream triggers receive an input object with the following structure:
type
'stream'
required
Always 'stream' for stream triggers
timestamp
number
required
Unix timestamp (milliseconds) when the event occurred
streamName
string
required
Name of the stream where the event occurred
groupId
string
required
Group identifier within the stream
id
string
required
Unique identifier for the item
event
StreamEvent<T>
required
Event object containing:
  • type: 'create', 'update', or 'delete'
  • data: The item data (for create and update events)

Event types

Create events

Triggered when a new item is added:
export const handler = async (input, ctx) => {
  if (input.event.type === 'create') {
    const newItem = input.event.data
    
    ctx.logger.info('Item created', {
      id: input.id,
      data: newItem,
    })
  }
}

Update events

Triggered when an existing item is modified:
export const handler = async (input, ctx) => {
  if (input.event.type === 'update') {
    const updatedItem = input.event.data
    
    ctx.logger.info('Item updated', {
      id: input.id,
      data: updatedItem,
    })
  }
}

Delete events

Triggered when an item is removed:
export const handler = async (input, ctx) => {
  if (input.event.type === 'delete') {
    const deletedItem = input.event.data
    
    ctx.logger.info('Item deleted', {
      id: input.id,
      data: deletedItem,
    })
  }
}

Filtering by group

Listen to events from a specific group:
import { step, stream } from 'motia'

export const config = step({
  name: 'on-user-events',
  triggers: [
    stream('events', {
      groupId: 'users',
    }),
  ],
})

export const handler = async (input, ctx) => {
  // Only receives events from 'users' group
  ctx.logger.info('User event', {
    userId: input.id,
    type: input.event.type,
  })
}

Filtering by item

Listen to events for a specific item:
import { step, stream } from 'motia'

export const config = step({
  name: 'on-specific-user',
  triggers: [
    stream('events', {
      groupId: 'users',
      itemId: 'user-123',
    }),
  ],
})

export const handler = async (input, ctx) => {
  // Only receives events for user-123
  ctx.logger.info('User 123 event', {
    type: input.event.type,
  })
}

Conditional triggers

Filter events with custom conditions:
import { step, stream } from 'motia'

export const config = step({
  name: 'on-high-value-order',
  triggers: [
    stream('orders', {
      condition: (input, ctx) => {
        const { event } = input
        return event.type === 'create' && event.data.total > 1000
      },
    }),
  ],
})

export const handler = async (input, ctx) => {
  ctx.logger.info('High value order created', {
    orderId: input.id,
    total: input.event.data.total,
  })
}

Accessing streams

Interact with streams from handlers:
export const handler = async (input, ctx) => {
  const stream = ctx.streams.events
  
  // Get item
  const item = await stream.get(input.groupId, input.id)
  
  // Update item
  await stream.update(input.groupId, input.id, [
    { type: 'set', path: 'processed', value: true },
  ])
  
  // List items in group
  const items = await stream.list(input.groupId)
  
  ctx.logger.info('Stream accessed', {
    itemCount: items.length,
  })
}

Configuration options

streamName
string
required
Name of the stream to listen to
groupId
string
Optional group identifier to filter events. If specified, only events from this group trigger the workflow
itemId
string
Optional item identifier to filter events. Requires groupId. If specified, only events for this specific item trigger the workflow
condition
function
Optional function (input, ctx) => boolean to filter events:
  • input.type - Always 'stream'
  • input.streamName - Stream name
  • input.groupId - Group identifier
  • input.id - Item identifier
  • input.event.type - Event type: 'create', 'update', or 'delete'
  • input.event.data - Item data

Use cases

Real-time notifications

Notify users of changes:
import { step, stream } from 'motia'

export const config = step({
  name: 'notify-followers',
  triggers: [
    stream('posts', {
      condition: (input) => input.event.type === 'create',
    }),
  ],
  enqueues: ['send-notification'],
})

export const handler = async (input, ctx) => {
  const post = input.event.data
  const authorId = post.authorId
  
  // Get followers
  const followersStream = ctx.streams.followers
  const followers = await followersStream.list(authorId)
  
  // Queue notifications
  for (const follower of followers) {
    await ctx.enqueue({
      topic: 'send-notification',
      data: {
        userId: follower.id,
        message: `${post.author} posted: ${post.title}`,
      },
    })
  }
}

Data synchronization

Sync data to external systems:
import { step, stream } from 'motia'

export const config = step({
  name: 'sync-to-external',
  triggers: [stream('products')],
})

export const handler = async (input, ctx) => {
  const { event, id } = input
  
  switch (event.type) {
    case 'create':
      await externalAPI.createProduct(event.data)
      break
    case 'update':
      await externalAPI.updateProduct(id, event.data)
      break
    case 'delete':
      await externalAPI.deleteProduct(id)
      break
  }
  
  ctx.logger.info('Synced to external system', { id })
}

Analytics tracking

Track user activity:
import { step, stream } from 'motia'

export const config = step({
  name: 'track-activity',
  triggers: [stream('user-actions')],
})

export const handler = async (input, ctx) => {
  const { event, groupId, id, timestamp } = input
  
  if (event.type === 'create') {
    const action = event.data
    
    // Track to analytics
    await analytics.track({
      userId: groupId,
      event: action.type,
      properties: action.properties,
      timestamp,
    })
    
    // Update user stats
    await ctx.state.update('user-stats', groupId, [
      { type: 'set', path: 'lastAction', value: timestamp },
      { type: 'increment', path: 'actionCount', value: 1 },
    ])
  }
}

Cache invalidation

Invalidate caches on data changes:
import { step, stream } from 'motia'

export const config = step({
  name: 'invalidate-cache',
  triggers: [stream('catalog')],
})

export const handler = async (input, ctx) => {
  const { groupId, id, event } = input
  
  if (event.type === 'update' || event.type === 'delete') {
    // Invalidate related caches
    await cache.delete(`product:${id}`)
    await cache.delete(`category:${groupId}`)
    
    ctx.logger.info('Cache invalidated', {
      productId: id,
      categoryId: groupId,
    })
  }
}

Build docs developers (and LLMs) love