Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The FlowContext provides access to state management, logging, queue operations, and stream utilities within step handlers.

Types

FlowContext

The main context object passed to step handlers.
interface FlowContext<TEnqueueData = never, TInput = unknown> {
  enqueue: Enqueuer<TEnqueueData>
  traceId: string
  state: InternalStateManager
  logger: Logger
  streams: Streams
  trigger: TriggerInfo
  is: TypeGuards<TInput>
  getData: () => ExtractDataPayload<TInput>
  match: <TResult = any>(
    handlers: MatchHandlers<TInput, TEnqueueData, TResult>
  ) => Promise<TResult | undefined>
}

Properties

enqueue
Enqueuer<TEnqueueData>
Function to enqueue messages to topics.
type Enqueuer<TData> = (event: TData) => Promise<void>
Usage:
await ctx.enqueue({ topic: 'process-order', data: orderData })
traceId
string
Unique identifier for tracing the current execution.
state
InternalStateManager
State management interface for persistent storage.
get
<T>(groupId: string, key: string) => Promise<T | null>
Retrieve a value from state.
set
<T>(groupId: string, key: string, value: T) => Promise<StreamSetResult<T> | null>
Store a value in state.
update
<T>(groupId: string, key: string, ops: UpdateOp[]) => Promise<StreamSetResult<T> | null>
Update a value using operations.
delete
<T>(groupId: string, key: string) => Promise<T | null>
Delete a value from state.
list
<T>(groupId: string) => Promise<T[]>
List all values in a group.
clear
(groupId: string) => Promise<void>
Clear all values in a group.
logger
Logger
Logger instance for structured logging.
ctx.logger.info('Processing order', { orderId: '123' })
ctx.logger.error('Failed to process', { error })
streams
Streams
Dictionary of configured streams accessible by name.
const chatStream = ctx.streams.chat
await chatStream.set(groupId, itemId, data)
trigger
TriggerInfo
Information about the trigger that activated this step.
type TriggerInfo = {
  type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
  index?: number
  path?: string
  method?: string
  topic?: string
  expression?: string
}
is
TypeGuards<TInput>
Type guard functions to check trigger types.
{
  queue: (input: TInput) => input is ExtractQueueInput<TInput>
  http: (input: TInput) => input is ExtractApiInput<TInput>
  cron: (input: TInput) => input is never
  state: (input: TInput) => input is ExtractStateInput<TInput>
  stream: (input: TInput) => input is ExtractStreamInput<TInput>
}
Usage:
if (ctx.is.http(input)) {
  // input is typed as HTTP request
  const body = input.request.body
}
getData
() => ExtractDataPayload<TInput>
Extracts the data payload from input regardless of trigger type.
  • For HTTP triggers: returns request.body
  • For queue triggers: returns the data directly
  • For cron triggers: returns undefined
const orderData = ctx.getData() // Works for both queue and HTTP
match
function
Pattern matching for handling different trigger types.
return ctx.match({
  http: async (request) => {
    return { status: 200, body: { success: true } }
  },
  queue: async (data) => {
    await processQueueData(data)
  },
  cron: async () => {
    await runScheduledTask()
  }
})

Input Types

ApiRequest

interface ApiRequest<TBody = unknown> {
  pathParams: Record<string, string>
  queryParams: Record<string, string | string[]>
  body: TBody
  headers: Record<string, string | string[]>
}

MotiaHttpArgs

interface MotiaHttpArgs<TBody = unknown> {
  request: MotiaHttpRequest<TBody>
  response: MotiaHttpResponse
}

interface MotiaHttpRequest<TBody = unknown> {
  pathParams: Record<string, string>
  queryParams: Record<string, string | string[]>
  body: TBody
  headers: Record<string, string | string[]>
  method: string
  requestBody: ChannelReader
}

interface MotiaHttpResponse {
  status: (statusCode: number) => void
  headers: (headers: Record<string, string>) => void
  stream: NodeJS.WritableStream
  close: () => void
}

StateTriggerInput

type StateTriggerInput<T> = {
  type: 'state'
  group_id: string
  item_id: string
  old_value?: T
  new_value?: T
}

StreamTriggerInput

type StreamTriggerInput<T> = {
  type: 'stream'
  timestamp: number
  streamName: string
  groupId: string
  id: string
  event: StreamEvent<T>
}

type StreamEvent<TData> =
  | { type: 'create'; data: TData }
  | { type: 'update'; data: TData }
  | { type: 'delete'; data: TData }

Response Types

ApiResponse

type ApiResponse<TStatus extends number = number, TBody = any> = {
  status: TStatus
  headers?: Record<string, string>
  body: TBody
}

Match Handlers

MatchHandlers

type MatchHandlers<TInput, TEnqueueData, TResult> = {
  queue?: (input: ExtractQueueInput<TInput>) => Promise<void>
  http?: (request: ExtractApiInput<TInput>) => Promise<TResult>
  cron?: () => Promise<void>
  state?: (input: ExtractStateInput<TInput>) => Promise<TResult>
  stream?: (input: ExtractStreamInput<TInput>) => Promise<TResult>
  default?: (input: TInput) => Promise<TResult | undefined>
}

Stream Configuration

StreamConfig

interface StreamConfig {
  name: string
  schema: StepSchemaInput
  baseConfig: { storageType: 'default' }
  onJoin?: (
    subscription: StreamSubscription,
    context: FlowContext,
    authContext?: StreamContext
  ) => PromiseOrValue<StreamJoinResult>
  onLeave?: (
    subscription: StreamSubscription,
    context: FlowContext,
    authContext?: StreamContext
  ) => PromiseOrValue<void>
}

StreamAuthInput

interface StreamAuthInput {
  headers: Record<string, string>
  path: string
  queryParams: Record<string, string[]>
  addr: string
}

Usage Example

import { step, http, queue } from 'motia'
import { z } from 'zod'

export default step(
  {
    name: 'order-handler',
    triggers: [
      http('POST', '/orders', {
        bodySchema: z.object({ orderId: z.string(), items: z.array(z.string()) })
      }),
      queue('process-orders', {
        input: z.object({ orderId: z.string(), items: z.array(z.string()) })
      })
    ],
    enqueues: ['send-confirmation']
  },
  async (input, ctx) => {
    // Extract data regardless of trigger type
    const orderData = ctx.getData()
    
    // Use state
    await ctx.state.set('orders', orderData.orderId, orderData)
    
    // Log
    ctx.logger.info('Processing order', { orderId: orderData.orderId })
    
    // Use streams
    await ctx.streams.notifications.set(
      'user-123',
      'order-update',
      { message: 'Order received' }
    )
    
    // Pattern match on trigger type
    return ctx.match({
      http: async (req) => {
        return { status: 200, body: { success: true } }
      },
      queue: async (data) => {
        await ctx.enqueue({
          topic: 'send-confirmation',
          data: { orderId: data.orderId }
        })
      }
    })
  }
)

Build docs developers (and LLMs) love