Skip to main content

Overview

FlowContext is provided as the second parameter to step handlers and trigger conditions. It provides access to logging, state management, event enqueueing, stream operations, and trigger information.

Type signature

interface FlowContext<TEnqueueData = never, TInput = unknown> {
  enqueue: Enqueuer<TEnqueueData>
  traceId: string
  state: InternalStateManager
  logger: Logger
  streams: Streams
  trigger: TriggerInfo
  is: TriggerTypeGuards<TInput>
  getData: () => ExtractDataPayload<TInput>
  match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => Promise<TResult | undefined>
}

Properties

enqueue

Enqueue events to topics for asynchronous processing.
enqueue
Enqueuer<TEnqueueData>
Function to enqueue messages to configured topics
type Enqueuer<TData> = (event: {
  topic: string
  data: TData
  messageGroupId?: string
}) => Promise<void>

Example

export const { config, handler } = step(
  {
    name: 'ProcessOrder',
    triggers: [http('POST', '/orders', { bodySchema: orderSchema })],
    enqueues: ['notification', 'analytics'],
  },
  async (request, ctx) => {
    const order = await createOrder(request.body)
    
    // Enqueue notification
    await ctx.enqueue({
      topic: 'notification',
      data: {
        email: order.email,
        templateId: 'order-confirmation',
        orderId: order.id,
      },
    })
    
    // Enqueue analytics event with message group for ordering
    await ctx.enqueue({
      topic: 'analytics',
      data: { event: 'order_created', orderId: order.id },
      messageGroupId: order.userId, // FIFO ordering per user
    })
    
    return { status: 200, body: order }
  }
)

traceId

Unique identifier for tracing requests across steps.
traceId
string
Unique trace ID for the current execution
async (input, ctx) => {
  ctx.logger.info('Processing request', { traceId: ctx.traceId })
  
  return {
    status: 200,
    body: { traceId: ctx.traceId },
  }
}

state

State manager for persisting and retrieving data.
state
InternalStateManager
State management interface for CRUD operations
See StateManager API for detailed documentation.
// Get a value
const order = await ctx.state.get<Order>('orders', orderId)

// Set a value
await ctx.state.set('orders', orderId, orderData)

// Update a value
await ctx.state.update('orders', orderId, [
  { op: 'replace', path: '/status', value: 'shipped' },
])

// Delete a value
await ctx.state.delete('orders', orderId)

// List all items in a group
const allOrders = await ctx.state.list<Order>('orders')

// Clear all items in a group
await ctx.state.clear('orders')

logger

Structured logger for debugging and monitoring.
logger
Logger
Logger instance with info, warn, error, and debug methods
interface Logger {
  info(message: string, meta?: Record<string, unknown>): void
  warn(message: string, meta?: Record<string, unknown>): void
  error(message: string, meta?: Record<string, unknown>): void
  debug(message: string, meta?: Record<string, unknown>): void
}

Example

async (input, ctx) => {
  ctx.logger.info('Processing started', { orderId: input.orderId })
  
  try {
    await processOrder(input)
    ctx.logger.info('Processing complete')
  } catch (error) {
    ctx.logger.error('Processing failed', {
      error: error.message,
      orderId: input.orderId,
    })
    throw error
  }
}

streams

Access to configured streams for real-time data.
streams
Streams
Object containing all configured stream instances
See Stream API for detailed documentation.
// Access a configured stream
const status = await ctx.streams.parallelMerge.get(groupId, itemId)
await ctx.streams.parallelMerge.set(groupId, itemId, updatedStatus)

trigger

Information about the trigger that invoked this step.
trigger
TriggerInfo
Metadata about the current trigger
type TriggerInfo = {
  type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
  index?: number          // Index in triggers array
  path?: string          // HTTP path
  method?: string        // HTTP method
  topic?: string         // Queue topic
  expression?: string    // Cron expression
}

Example

async (input, ctx) => {
  ctx.logger.info('Trigger info', {
    type: ctx.trigger.type,
    topic: ctx.trigger.topic,
  })
  
  if (ctx.trigger.type === 'http') {
    console.log(`Received ${ctx.trigger.method} ${ctx.trigger.path}`)
  }
}

Methods

getData()

Extract the data payload from the input, regardless of trigger type.
getData
() => ExtractDataPayload<TInput>
Returns the data payload, unwrapping API requests to return just the body
  • For HTTP triggers: returns request.body
  • For queue triggers: returns the queue data directly
  • For cron triggers: returns undefined
  • For state triggers: returns the state trigger input
  • For stream triggers: returns the stream trigger input

Example

import { step, http, queue } from 'motia'

const dataSchema = z.object({
  userId: z.string(),
  action: z.string(),
})

export const { config, handler } = step(
  {
    name: 'ProcessAction',
    triggers: [
      queue('user-actions', { input: dataSchema }),
      http('POST', '/actions', { bodySchema: dataSchema }),
    ],
  },
  async (input, ctx) => {
    // Works for both HTTP and queue triggers
    const data = ctx.getData()
    
    ctx.logger.info('Processing action', {
      userId: data.userId,
      action: data.action,
      trigger: ctx.trigger.type,
    })
    
    await processAction(data)
  }
)

is

Type guard utilities for checking trigger types.
is
TriggerTypeGuards<TInput>
Object with type guard methods for each trigger type
is: {
  queue: (input: TInput) => input is QueueInput
  http: (input: TInput) => input is ApiRequest
  cron: (input: TInput) => input is never
  state: (input: TInput) => input is StateTriggerInput
  stream: (input: TInput) => input is StreamTriggerInput
}

Example

async (input, ctx) => {
  if (ctx.is.http(input)) {
    // TypeScript knows input is ApiRequest
    const { pathParams, queryParams, body, headers } = input
    ctx.logger.info('HTTP request', { path: ctx.trigger.path })
  }
  
  if (ctx.is.queue(input)) {
    // TypeScript knows input is the queue data
    ctx.logger.info('Queue message', { data: input })
  }
  
  if (ctx.is.state(input)) {
    // TypeScript knows input is StateTriggerInput
    ctx.logger.info('State change', {
      groupId: input.group_id,
      itemId: input.item_id,
      oldValue: input.old_value,
      newValue: input.new_value,
    })
  }
}

match()

Pattern match on trigger types to execute different logic.
match
(handlers: MatchHandlers) => Promise<TResult | undefined>
Execute different handlers based on trigger type
type MatchHandlers<TInput, TEnqueueData, TResult> = {
  queue?: (input: QueueInput) => Promise<void>
  http?: (request: ApiRequest) => Promise<TResult>
  cron?: () => Promise<void>
  state?: (input: StateTriggerInput) => Promise<TResult>
  stream?: (input: StreamTriggerInput) => Promise<TResult>
  default?: (input: TInput) => Promise<TResult | undefined>
}

Example

export const { config, handler } = step(
  {
    name: 'ProcessOrder',
    triggers: [
      queue('orders', { input: orderSchema }),
      http('POST', '/orders', { bodySchema: orderSchema }),
    ],
  },
  async (input, ctx) => {
    const data = ctx.getData()
    const order = await createOrder(data)
    
    // Return response only for HTTP, queue triggers return void
    return ctx.match({
      http: async () => ({
        status: 200,
        body: { success: true, order },
      }),
      queue: async () => {
        ctx.logger.info('Order queued successfully', { orderId: order.id })
      },
    })
  }
)

Usage patterns

Multi-trigger steps

Handle different trigger types with shared logic:
export const { config, handler } = step(
  {
    name: 'ProcessData',
    triggers: [
      http('POST', '/data', { bodySchema: dataSchema }),
      queue('data-events', { input: dataSchema }),
      cron('0 * * * *'),
    ],
  },
  async (input, ctx) => {
    let data
    
    if (ctx.is.http(input)) {
      data = input.body
    } else if (ctx.is.queue(input)) {
      data = input
    } else if (ctx.is.cron(input)) {
      data = await fetchScheduledData()
    }
    
    await processData(data)
    
    return ctx.match({
      http: async () => ({ status: 200, body: { success: true } }),
    })
  }
)

State and logging

Combine state management with structured logging:
async (input, ctx) => {
  const orderId = input.orderId
  
  ctx.logger.info('Fetching order', { orderId, traceId: ctx.traceId })
  
  const order = await ctx.state.get<Order>('orders', orderId)
  
  if (!order) {
    ctx.logger.warn('Order not found', { orderId })
    return { status: 404, body: { error: 'Order not found' } }
  }
  
  ctx.logger.info('Order retrieved', { order })
  return { status: 200, body: order }
}

Build docs developers (and LLMs) love