Skip to main content
Every step handler receives a FlowContext object as its second parameter. This context provides access to Motia’s runtime features: queues, state, streams, logging, and trigger information.

FlowContext interface

interface FlowContext<TEnqueueData, TInput> {
  // Queue operations
  enqueue: Enqueuer<TEnqueueData>
  
  // Tracing
  traceId: string
  
  // State management
  state: InternalStateManager
  
  // Logging
  logger: Logger
  
  // Streams
  streams: Streams
  
  // Trigger information
  trigger: TriggerInfo
  
  // Type guards
  is: {
    queue: (input: TInput) => input is QueueInput
    http: (input: TInput) => input is HttpInput
    cron: (input: TInput) => input is never
    state: (input: TInput) => input is StateTriggerInput
    stream: (input: TInput) => input is StreamTriggerInput
  }
  
  // Data extraction
  getData: () => ExtractDataPayload<TInput>
  
  // Pattern matching
  match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => 
    Promise<TResult | undefined>
}

Enqueue

Publish messages to queue topics defined in your step’s enqueues array.
import { step, http } from 'motia'

export default step({
  name: 'create-order',
  triggers: [http('POST', '/orders')],
  enqueues: ['order-created', 'send-email']
}, async (input, ctx) => {
  const order = await createOrder(input.request.body)
  
  // Enqueue to order-created topic
  await ctx.enqueue({
    topic: 'order-created',
    data: { orderId: order.id, amount: order.amount }
  })
  
  // Enqueue with message group ID (for FIFO)
  await ctx.enqueue({
    topic: 'send-email',
    data: { to: order.email, template: 'order-confirmation' },
    messageGroupId: `customer-${order.customerId}`
  })
  
  return { status: 201, body: order }
})
You can only enqueue to topics declared in the step’s enqueues configuration. Motia validates this at build time.

State

Access scoped key-value storage that persists across step executions.
import { step, http } from 'motia'

export default step({
  name: 'track-visits',
  triggers: [http('GET', '/page/:pageId')]
}, async (input, ctx) => {
  const { pageId } = input.request.pathParams
  const scope = 'page-visits'
  
  // Get current count
  const current = await ctx.state.get<{ count: number }>(scope, pageId)
  const count = (current?.count ?? 0) + 1
  
  // Update count
  await ctx.state.set(scope, pageId, { count, lastVisit: Date.now() })
  
  return { status: 200, body: { pageId, visits: count } }
})

State operations

get
(scope: string, key: string) => Promise<T | null>
Retrieve a value by scope and key
set
(scope: string, key: string, value: T) => Promise<StreamSetResult<T> | null>
Store a value with scope and key
update
(scope: string, key: string, ops: UpdateOp[]) => Promise<StreamSetResult<T> | null>
Apply partial updates using update operations:
  • { type: 'set', path: 'field', value: newValue }
  • { type: 'delete', path: 'field' }
  • { type: 'increment', path: 'counter', value: 1 }
delete
(scope: string, key: string) => Promise<T | null>
Delete a value and return the previous value
list
(scope: string) => Promise<T[]>
List all values in a scope
clear
(scope: string) => Promise<void>
Delete all values in a scope
listGroups
() => Promise<string[]>
List all scope IDs
Use scopes to organize related data. For example, user-sessions, rate-limits, or feature-flags.

Streams

Access configured streams for real-time data operations.
import { step, http } from 'motia'

export default step({
  name: 'update-document',
  triggers: [http('PUT', '/documents/:id')]
}, async (input, ctx) => {
  const { id } = input.request.pathParams
  const updates = input.request.body
  
  // Access a stream
  const documents = ctx.streams.documents
  
  // Update the document
  await documents.update('workspace-1', id, [
    { type: 'set', path: 'content', value: updates.content },
    { type: 'set', path: 'updatedAt', value: Date.now() }
  ])
  
  // Get the updated document
  const doc = await documents.get('workspace-1', id)
  
  return { status: 200, body: doc }
})
Streams must be declared in motia.config.ts to be accessible via ctx.streams. See Streams for details.

Logger

Structured logging with automatic trace ID inclusion.
export default step({
  name: 'process-payment',
  triggers: [http('POST', '/payments')]
}, async (input, ctx) => {
  ctx.logger.info('Processing payment', { 
    amount: input.request.body.amount,
    currency: input.request.body.currency
  })
  
  try {
    const result = await chargeCard(input.request.body)
    ctx.logger.info('Payment successful', { transactionId: result.id })
    return { status: 200, body: result }
  } catch (error) {
    ctx.logger.error('Payment failed', { error: error.message })
    return { status: 500, body: { error: 'Payment processing failed' } }
  }
})

Log levels

  • ctx.logger.debug() - Detailed debugging information
  • ctx.logger.info() - General informational messages
  • ctx.logger.warn() - Warning messages
  • ctx.logger.error() - Error messages

Trace ID

Every step execution has a unique trace ID for distributed tracing.
export default step(config, async (input, ctx) => {
  // Pass trace ID to external services
  await fetch('https://api.example.com/process', {
    headers: {
      'X-Trace-Id': ctx.traceId
    },
    body: JSON.stringify(data)
  })
})

Trigger info

Inspect which trigger fired the current execution.
interface TriggerInfo {
  type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
  index?: number          // Index in triggers array
  
  // HTTP specific
  path?: string
  method?: string
  
  // Queue specific
  topic?: string
  
  // Cron specific
  expression?: string
}
Example usage:
export default step(config, async (input, ctx) => {
  ctx.logger.info('Step triggered', {
    type: ctx.trigger.type,
    method: ctx.trigger.method,
    path: ctx.trigger.path
  })
})

Type guards

Check trigger type at runtime with type-safe guards.
export default step({
  name: 'multi-trigger-step',
  triggers: [
    http('POST', '/items'),
    queue('items')
  ]
}, async (input, ctx) => {
  if (ctx.is.http(input)) {
    // TypeScript knows input is MotiaHttpArgs
    const body = input.request.body
    return { status: 200, body: { received: true } }
  }
  
  if (ctx.is.queue(input)) {
    // TypeScript knows input is queue data
    await processQueueMessage(input)
  }
})

Get data

Extract the data payload regardless of trigger type.
import { step, http, queue } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  orderId: z.string(),
  amount: z.number()
})

export default step({
  name: 'process-order',
  triggers: [
    http('POST', '/orders', { bodySchema: orderSchema }),
    queue('orders', { input: orderSchema })
  ]
}, async (input, ctx) => {
  // Extract data from both HTTP body and queue message
  const orderData = ctx.getData()
  // orderData is typed as z.infer<typeof orderSchema>
  
  await processOrder(orderData)
  
  if (ctx.is.http(input)) {
    return { status: 200, body: { success: true } }
  }
})
Use ctx.getData() when multiple triggers share the same data schema. This avoids duplicating data extraction logic.

Pattern matching

Handle different trigger types with type-safe pattern matching.
export default step({
  name: 'sync-user',
  triggers: [
    http('POST', '/users/:id/sync'),
    queue('user-sync'),
    cron('0 2 * * *') // Daily at 2 AM
  ]
}, async (input, ctx) => {
  return ctx.match({
    http: async (req) => {
      const userId = req.request.pathParams.id
      await syncUser(userId)
      return { status: 200, body: { synced: userId } }
    },
    queue: async (data) => {
      await syncUser(data.userId)
    },
    cron: async () => {
      await syncAllUsers()
    },
    default: async (input) => {
      ctx.logger.warn('Unhandled trigger type')
    }
  })
})
ctx.match() throws if no handler matches and no default handler is provided.

Next steps

Handlers

Learn handler patterns and best practices

State management

Deep dive into state operations

Streams

Working with real-time streams

Triggers

Review available trigger types

Build docs developers (and LLMs) love