Skip to main content

Overview

The Flow Context (ctx) is passed as the second argument to every step handler. It provides utilities for:
  • Enqueuing events to other steps
  • Managing state with key-value storage
  • Logging structured logs
  • Accessing streams for real-time data
  • Trigger information about how the step was invoked

Context Structure

interface FlowContext<TEnqueueData = never, TInput = unknown> {
  enqueue: Enqueuer<TEnqueueData>
  traceId: string
  state: InternalStateManager
  logger: Logger
  streams: Streams
  trigger: TriggerInfo
  
  // Type guards
  is: {
    queue: (input: TInput) => boolean
    http: (input: TInput) => boolean
    cron: (input: TInput) => boolean
    state: (input: TInput) => boolean
    stream: (input: TInput) => boolean
  }
  
  // Utilities
  getData: () => ExtractDataPayload<TInput>
  match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => Promise<TResult>
}
Reference: motia-js/packages/motia/src/types.ts:58-92, motia-py/packages/motia/src/motia/types.py:36-114

ctx.enqueue()

Enqueue events to trigger other steps asynchronously.

Basic Usage

// Enqueue a single event
await ctx.enqueue({
  topic: 'order.created',
  data: {
    orderId: '12345',
    amount: 99.99,
    items: ['item1', 'item2'],
  },
})

// For FIFO queues, specify message group ID
await ctx.enqueue({
  topic: 'order.processing',
  data: { orderId: '12345' },
  messageGroupId: 'customer-456', // Ensures ordering per customer
})

Type Safety

Enqueue topics are type-checked against the step’s enqueues configuration:
export const config = {
  name: 'ProcessOrder',
  enqueues: ['order.processed', 'order.failed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  // ✅ Type-safe - 'order.processed' is in enqueues array
  await ctx.enqueue({ topic: 'order.processed', data: { orderId: '123' } })
  
  // ❌ Type error - 'invalid.topic' not in enqueues array
  await ctx.enqueue({ topic: 'invalid.topic', data: {} })
}
Reference: motia-js/packages/motia/src/types.ts:28-29, motia-js/playground/steps/hello/hello-api.step.ts:31-39

ctx.state

Access the distributed key-value state management system.

State Operations

// Set a value
await ctx.state.set('users', 'user-123', {
  id: 'user-123',
  name: 'Alice',
  email: '[email protected]',
})

// Get a value
const user = await ctx.state.get<User>('users', 'user-123')

// Update a value (JSON Patch operations)
await ctx.state.update('users', 'user-123', [
  { op: 'replace', path: '/name', value: 'Alice Smith' },
  { op: 'add', path: '/age', value: 30 },
])

// Delete a value
await ctx.state.delete('users', 'user-123')

// List all values in a group
const allUsers = await ctx.state.list<User>('users')

// Clear entire group
await ctx.state.clear('users')

State Structure

State is organized hierarchically:
group_id (e.g., "users", "orders", "sessions")
  └── item_id (e.g., "user-123", "order-456")
        └── value (any JSON-serializable data)

State Triggers

State changes automatically fire state triggers:
// This set operation...
await ctx.state.set('users', 'user-123', { name: 'Alice', status: 'active' })

// ...triggers steps with state triggers:
export const config = {
  name: 'OnUserChange',
  triggers: [
    state({
      condition: (input) => input.group_id === 'users',
    }),
  ],
}

export const handler = async (input, ctx) => {
  // input.group_id === 'users'
  // input.item_id === 'user-123'
  // input.new_value === { name: 'Alice', status: 'active' }
  // input.old_value === (previous value or undefined)
}
Reference: motia-js/packages/motia/src/types.ts:19-26, motia-js/playground/steps/multi-trigger-example.step.ts:79, motia-py/packages/motia/src/motia/types.py:21-31

ctx.logger

Structured logging with automatic trace correlation.

Logging Levels

ctx.logger.debug('Debug message', { details: 'low-level info' })
ctx.logger.info('Info message', { userId: '123' })
ctx.logger.warn('Warning message', { reason: 'rate limit approaching' })
ctx.logger.error('Error message', { error: err.message, stack: err.stack })

Automatic Trace Correlation

All logs are automatically tagged with:
  • traceId: Unique identifier for the entire workflow
  • spanId: Identifier for the current operation
  • trigger metadata: Type, path, topic, etc.
Logs are ingested via OpenTelemetry and viewable in the Motia Console. Reference: motia-js/packages/motia/src/types.ts:62, engine/src/engine/mod.rs:67-75

ctx.streams

Access real-time data streams for collaborative applications.

Stream Operations

// Set a stream item
const result = await ctx.streams.todo.set('inbox', todoId, {
  id: todoId,
  description: 'Buy groceries',
  createdAt: new Date().toISOString(),
})

// Get a stream item
const todo = await ctx.streams.todo.get('inbox', todoId)

// Update a stream item
await ctx.streams.todo.update('inbox', todoId, [
  { op: 'replace', path: '/description', value: 'Buy groceries and cook dinner' },
])

// Delete a stream item
await ctx.streams.todo.delete('inbox', todoId)

// List items in a group
const allTodos = await ctx.streams.todo.list('inbox')

Streams vs State

FeatureStreamsState
Use CaseReal-time collaboration, UI syncBackend data storage
PersistenceOptional (configurable)Always persisted
WebSocket UpdatesYes (live updates to clients)No
Access ControlPer-stream authGlobal
TriggersStream triggersState triggers
Reference: motia-js/playground/steps/todo/create-todo.step.ts:47, engine/src/modules/stream/mod.rs

ctx.trigger

Information about how the step was triggered.

Trigger Info Structure

type TriggerInfo = {
  type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
  index?: number  // Index in triggers array
  
  // HTTP-specific
  path?: string
  method?: string
  
  // Queue-specific
  topic?: string
  
  // Cron-specific
  expression?: string
}

Usage Example

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Step triggered', {
    triggerType: ctx.trigger.type,
    triggerPath: ctx.trigger.path,
    triggerTopic: ctx.trigger.topic,
  })
  
  if (ctx.trigger.type === 'http') {
    ctx.logger.info(`HTTP ${ctx.trigger.method} ${ctx.trigger.path}`)
  } else if (ctx.trigger.type === 'queue') {
    ctx.logger.info(`Queue topic: ${ctx.trigger.topic}`)
  }
}
Reference: motia-js/packages/motia/src/types.ts:98-105

ctx.traceId

Unique identifier for distributed tracing across the workflow.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  // Log the trace ID
  ctx.logger.info('Processing request', { traceId: ctx.traceId })
  
  // Pass to external services for correlation
  await fetch('https://api.example.com/process', {
    headers: {
      'X-Trace-Id': ctx.traceId,
    },
  })
}
The trace ID follows W3C Trace Context format and is automatically propagated across function calls and enqueued events. Reference: motia-js/packages/motia/src/types.ts:60, engine/src/protocol.rs:75-80

Type Guards (ctx.is)

Check which trigger type invoked the step.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  if (ctx.is.http(input)) {
    // TypeScript narrows input to MotiaHttpArgs
    const body = input.request.body
    return { status: 200, body: { message: 'OK' } }
  }
  
  if (ctx.is.queue(input)) {
    // TypeScript narrows input to queue payload
    ctx.logger.info('Processing queue event', input)
  }
  
  if (ctx.is.cron(input)) {
    // input is undefined for cron triggers
    ctx.logger.info('Cron trigger fired')
  }
}
Reference: motia-js/packages/motia/src/types.ts:66-72

ctx.getData()

Extract the data payload regardless of trigger type.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  // Works for both HTTP (extracts request.body) and queue triggers (returns data directly)
  const data = ctx.getData()
  
  // Process data uniformly
  const result = await processData(data)
  
  if (ctx.is.http(input)) {
    return { status: 200, body: result }
  }
}
Reference: motia-js/packages/motia/src/types.ts:74-88, motia-py/packages/motia/src/motia/types.py:69-83

ctx.match()

Handle multiple trigger types with pattern matching.
export const handler: Handlers<typeof config> = async (input, ctx) => {
  return ctx.match({
    http: async ({ request }) => {
      ctx.logger.info('HTTP request', { body: request.body })
      return { status: 200, body: { message: 'OK' } }
    },
    
    queue: async (data) => {
      ctx.logger.info('Queue event', data)
      await processQueueEvent(data)
    },
    
    cron: async () => {
      ctx.logger.info('Cron triggered')
      await runScheduledTask()
    },
    
    state: async (stateInput) => {
      ctx.logger.info('State changed', {
        group: stateInput.group_id,
        item: stateInput.item_id,
      })
    },
    
    stream: async (streamInput) => {
      ctx.logger.info('Stream event', {
        stream: streamInput.streamName,
        event: streamInput.event.type,
      })
    },
    
    default: async (input) => {
      ctx.logger.warn('Unknown trigger type', { input })
    },
  })
}
Reference: motia-js/packages/motia/src/types.ts:91, motia-js/playground/steps/multi-trigger-example.step.ts:70-152, motia-py/packages/motia/src/motia/types.py:84-113

Best Practices

Always include relevant context in logs. Avoid plain string messages:
// ❌ Bad
ctx.logger.info('Order processed')

// ✅ Good
ctx.logger.info('Order processed', { orderId, amount, status })
Declare all topics in the enqueues array for compile-time validation:
export const config = {
  enqueues: ['order.created', 'order.failed'],
} as const satisfies StepConfig
Use meaningful group IDs for state organization:
await ctx.state.set('users', userId, userData)      // ✅ Good
await ctx.state.set('user-data', userId, userData)  // ❌ Redundant
Wrap state, stream, and enqueue operations in try-catch blocks:
try {
  await ctx.state.set('users', userId, data)
} catch (err) {
  ctx.logger.error('Failed to save user', { error: err.message })
  throw err
}

Next Steps

State Management

Deep dive into state operations

Streams

Learn about real-time streams

Observability

Explore logs, traces, and metrics

Workflows

Organize steps into workflows

Build docs developers (and LLMs) love