Overview
FlowContext is provided as the second parameter to step handlers and trigger conditions. It provides access to logging, state management, event enqueueing, stream operations, and trigger information.
Type signature
interface FlowContext<TEnqueueData = never, TInput = unknown> {
enqueue: Enqueuer<TEnqueueData>
traceId: string
state: InternalStateManager
logger: Logger
streams: Streams
trigger: TriggerInfo
is: TriggerTypeGuards<TInput>
getData: () => ExtractDataPayload<TInput>
match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => Promise<TResult | undefined>
}
Properties
enqueue
Enqueue events to topics for asynchronous processing.
Function to enqueue messages to configured topics
type Enqueuer<TData> = (event: {
topic: string
data: TData
messageGroupId?: string
}) => Promise<void>
Example
export const { config, handler } = step(
{
name: 'ProcessOrder',
triggers: [http('POST', '/orders', { bodySchema: orderSchema })],
enqueues: ['notification', 'analytics'],
},
async (request, ctx) => {
const order = await createOrder(request.body)
// Enqueue notification
await ctx.enqueue({
topic: 'notification',
data: {
email: order.email,
templateId: 'order-confirmation',
orderId: order.id,
},
})
// Enqueue analytics event with message group for ordering
await ctx.enqueue({
topic: 'analytics',
data: { event: 'order_created', orderId: order.id },
messageGroupId: order.userId, // FIFO ordering per user
})
return { status: 200, body: order }
}
)
traceId
Unique identifier for tracing requests across steps.
Unique trace ID for the current execution
async (input, ctx) => {
ctx.logger.info('Processing request', { traceId: ctx.traceId })
return {
status: 200,
body: { traceId: ctx.traceId },
}
}
state
State manager for persisting and retrieving data.
State management interface for CRUD operations
See StateManager API for detailed documentation.
// Get a value
const order = await ctx.state.get<Order>('orders', orderId)
// Set a value
await ctx.state.set('orders', orderId, orderData)
// Update a value
await ctx.state.update('orders', orderId, [
{ op: 'replace', path: '/status', value: 'shipped' },
])
// Delete a value
await ctx.state.delete('orders', orderId)
// List all items in a group
const allOrders = await ctx.state.list<Order>('orders')
// Clear all items in a group
await ctx.state.clear('orders')
logger
Structured logger for debugging and monitoring.
Logger instance with info, warn, error, and debug methods
interface Logger {
info(message: string, meta?: Record<string, unknown>): void
warn(message: string, meta?: Record<string, unknown>): void
error(message: string, meta?: Record<string, unknown>): void
debug(message: string, meta?: Record<string, unknown>): void
}
Example
async (input, ctx) => {
ctx.logger.info('Processing started', { orderId: input.orderId })
try {
await processOrder(input)
ctx.logger.info('Processing complete')
} catch (error) {
ctx.logger.error('Processing failed', {
error: error.message,
orderId: input.orderId,
})
throw error
}
}
streams
Access to configured streams for real-time data.
Object containing all configured stream instances
See Stream API for detailed documentation.
// Access a configured stream
const status = await ctx.streams.parallelMerge.get(groupId, itemId)
await ctx.streams.parallelMerge.set(groupId, itemId, updatedStatus)
trigger
Information about the trigger that invoked this step.
Metadata about the current trigger
type TriggerInfo = {
type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
index?: number // Index in triggers array
path?: string // HTTP path
method?: string // HTTP method
topic?: string // Queue topic
expression?: string // Cron expression
}
Example
async (input, ctx) => {
ctx.logger.info('Trigger info', {
type: ctx.trigger.type,
topic: ctx.trigger.topic,
})
if (ctx.trigger.type === 'http') {
console.log(`Received ${ctx.trigger.method} ${ctx.trigger.path}`)
}
}
Methods
getData()
Extract the data payload from the input, regardless of trigger type.
getData
() => ExtractDataPayload<TInput>
Returns the data payload, unwrapping API requests to return just the body
- For HTTP triggers: returns
request.body
- For queue triggers: returns the queue data directly
- For cron triggers: returns
undefined
- For state triggers: returns the state trigger input
- For stream triggers: returns the stream trigger input
Example
import { step, http, queue } from 'motia'
const dataSchema = z.object({
userId: z.string(),
action: z.string(),
})
export const { config, handler } = step(
{
name: 'ProcessAction',
triggers: [
queue('user-actions', { input: dataSchema }),
http('POST', '/actions', { bodySchema: dataSchema }),
],
},
async (input, ctx) => {
// Works for both HTTP and queue triggers
const data = ctx.getData()
ctx.logger.info('Processing action', {
userId: data.userId,
action: data.action,
trigger: ctx.trigger.type,
})
await processAction(data)
}
)
Type guard utilities for checking trigger types.
is
TriggerTypeGuards<TInput>
Object with type guard methods for each trigger type
is: {
queue: (input: TInput) => input is QueueInput
http: (input: TInput) => input is ApiRequest
cron: (input: TInput) => input is never
state: (input: TInput) => input is StateTriggerInput
stream: (input: TInput) => input is StreamTriggerInput
}
Example
async (input, ctx) => {
if (ctx.is.http(input)) {
// TypeScript knows input is ApiRequest
const { pathParams, queryParams, body, headers } = input
ctx.logger.info('HTTP request', { path: ctx.trigger.path })
}
if (ctx.is.queue(input)) {
// TypeScript knows input is the queue data
ctx.logger.info('Queue message', { data: input })
}
if (ctx.is.state(input)) {
// TypeScript knows input is StateTriggerInput
ctx.logger.info('State change', {
groupId: input.group_id,
itemId: input.item_id,
oldValue: input.old_value,
newValue: input.new_value,
})
}
}
match()
Pattern match on trigger types to execute different logic.
match
(handlers: MatchHandlers) => Promise<TResult | undefined>
Execute different handlers based on trigger type
type MatchHandlers<TInput, TEnqueueData, TResult> = {
queue?: (input: QueueInput) => Promise<void>
http?: (request: ApiRequest) => Promise<TResult>
cron?: () => Promise<void>
state?: (input: StateTriggerInput) => Promise<TResult>
stream?: (input: StreamTriggerInput) => Promise<TResult>
default?: (input: TInput) => Promise<TResult | undefined>
}
Example
export const { config, handler } = step(
{
name: 'ProcessOrder',
triggers: [
queue('orders', { input: orderSchema }),
http('POST', '/orders', { bodySchema: orderSchema }),
],
},
async (input, ctx) => {
const data = ctx.getData()
const order = await createOrder(data)
// Return response only for HTTP, queue triggers return void
return ctx.match({
http: async () => ({
status: 200,
body: { success: true, order },
}),
queue: async () => {
ctx.logger.info('Order queued successfully', { orderId: order.id })
},
})
}
)
Usage patterns
Multi-trigger steps
Handle different trigger types with shared logic:
export const { config, handler } = step(
{
name: 'ProcessData',
triggers: [
http('POST', '/data', { bodySchema: dataSchema }),
queue('data-events', { input: dataSchema }),
cron('0 * * * *'),
],
},
async (input, ctx) => {
let data
if (ctx.is.http(input)) {
data = input.body
} else if (ctx.is.queue(input)) {
data = input
} else if (ctx.is.cron(input)) {
data = await fetchScheduledData()
}
await processData(data)
return ctx.match({
http: async () => ({ status: 200, body: { success: true } }),
})
}
)
State and logging
Combine state management with structured logging:
async (input, ctx) => {
const orderId = input.orderId
ctx.logger.info('Fetching order', { orderId, traceId: ctx.traceId })
const order = await ctx.state.get<Order>('orders', orderId)
if (!order) {
ctx.logger.warn('Order not found', { orderId })
return { status: 404, body: { error: 'Order not found' } }
}
ctx.logger.info('Order retrieved', { order })
return { status: 200, body: order }
}