Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The FlowContext provides access to state management, logging, queue operations, and stream utilities within step handlers.
Types
FlowContext
The main context object passed to step handlers.
interface FlowContext<TEnqueueData = never, TInput = unknown> {
enqueue: Enqueuer<TEnqueueData>
traceId: string
state: InternalStateManager
logger: Logger
streams: Streams
trigger: TriggerInfo
is: TypeGuards<TInput>
getData: () => ExtractDataPayload<TInput>
match: <TResult = any>(
handlers: MatchHandlers<TInput, TEnqueueData, TResult>
) => Promise<TResult | undefined>
}
Properties
Function to enqueue messages to topics.type Enqueuer<TData> = (event: TData) => Promise<void>
Usage:await ctx.enqueue({ topic: 'process-order', data: orderData })
Unique identifier for tracing the current execution.
State management interface for persistent storage.get
<T>(groupId: string, key: string) => Promise<T | null>
Retrieve a value from state.
set
<T>(groupId: string, key: string, value: T) => Promise<StreamSetResult<T> | null>
Store a value in state.
update
<T>(groupId: string, key: string, ops: UpdateOp[]) => Promise<StreamSetResult<T> | null>
Update a value using operations.
delete
<T>(groupId: string, key: string) => Promise<T | null>
Delete a value from state.
list
<T>(groupId: string) => Promise<T[]>
List all values in a group.
clear
(groupId: string) => Promise<void>
Clear all values in a group.
Logger instance for structured logging.ctx.logger.info('Processing order', { orderId: '123' })
ctx.logger.error('Failed to process', { error })
Dictionary of configured streams accessible by name.const chatStream = ctx.streams.chat
await chatStream.set(groupId, itemId, data)
Information about the trigger that activated this step.type TriggerInfo = {
type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
index?: number
path?: string
method?: string
topic?: string
expression?: string
}
Type guard functions to check trigger types.{
queue: (input: TInput) => input is ExtractQueueInput<TInput>
http: (input: TInput) => input is ExtractApiInput<TInput>
cron: (input: TInput) => input is never
state: (input: TInput) => input is ExtractStateInput<TInput>
stream: (input: TInput) => input is ExtractStreamInput<TInput>
}
Usage:if (ctx.is.http(input)) {
// input is typed as HTTP request
const body = input.request.body
}
getData
() => ExtractDataPayload<TInput>
Extracts the data payload from input regardless of trigger type.
- For HTTP triggers: returns
request.body
- For queue triggers: returns the data directly
- For cron triggers: returns
undefined
const orderData = ctx.getData() // Works for both queue and HTTP
Pattern matching for handling different trigger types.return ctx.match({
http: async (request) => {
return { status: 200, body: { success: true } }
},
queue: async (data) => {
await processQueueData(data)
},
cron: async () => {
await runScheduledTask()
}
})
ApiRequest
interface ApiRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
}
MotiaHttpArgs
interface MotiaHttpArgs<TBody = unknown> {
request: MotiaHttpRequest<TBody>
response: MotiaHttpResponse
}
interface MotiaHttpRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
method: string
requestBody: ChannelReader
}
interface MotiaHttpResponse {
status: (statusCode: number) => void
headers: (headers: Record<string, string>) => void
stream: NodeJS.WritableStream
close: () => void
}
type StateTriggerInput<T> = {
type: 'state'
group_id: string
item_id: string
old_value?: T
new_value?: T
}
type StreamTriggerInput<T> = {
type: 'stream'
timestamp: number
streamName: string
groupId: string
id: string
event: StreamEvent<T>
}
type StreamEvent<TData> =
| { type: 'create'; data: TData }
| { type: 'update'; data: TData }
| { type: 'delete'; data: TData }
Response Types
ApiResponse
type ApiResponse<TStatus extends number = number, TBody = any> = {
status: TStatus
headers?: Record<string, string>
body: TBody
}
Match Handlers
MatchHandlers
type MatchHandlers<TInput, TEnqueueData, TResult> = {
queue?: (input: ExtractQueueInput<TInput>) => Promise<void>
http?: (request: ExtractApiInput<TInput>) => Promise<TResult>
cron?: () => Promise<void>
state?: (input: ExtractStateInput<TInput>) => Promise<TResult>
stream?: (input: ExtractStreamInput<TInput>) => Promise<TResult>
default?: (input: TInput) => Promise<TResult | undefined>
}
Stream Configuration
StreamConfig
interface StreamConfig {
name: string
schema: StepSchemaInput
baseConfig: { storageType: 'default' }
onJoin?: (
subscription: StreamSubscription,
context: FlowContext,
authContext?: StreamContext
) => PromiseOrValue<StreamJoinResult>
onLeave?: (
subscription: StreamSubscription,
context: FlowContext,
authContext?: StreamContext
) => PromiseOrValue<void>
}
interface StreamAuthInput {
headers: Record<string, string>
path: string
queryParams: Record<string, string[]>
addr: string
}
Usage Example
import { step, http, queue } from 'motia'
import { z } from 'zod'
export default step(
{
name: 'order-handler',
triggers: [
http('POST', '/orders', {
bodySchema: z.object({ orderId: z.string(), items: z.array(z.string()) })
}),
queue('process-orders', {
input: z.object({ orderId: z.string(), items: z.array(z.string()) })
})
],
enqueues: ['send-confirmation']
},
async (input, ctx) => {
// Extract data regardless of trigger type
const orderData = ctx.getData()
// Use state
await ctx.state.set('orders', orderData.orderId, orderData)
// Log
ctx.logger.info('Processing order', { orderId: orderData.orderId })
// Use streams
await ctx.streams.notifications.set(
'user-123',
'order-update',
{ message: 'Order received' }
)
// Pattern match on trigger type
return ctx.match({
http: async (req) => {
return { status: 200, body: { success: true } }
},
queue: async (data) => {
await ctx.enqueue({
topic: 'send-confirmation',
data: { orderId: data.orderId }
})
}
})
}
)