Configuration types
StepConfig
Configuration object for defining a step.
type StepConfig = {
name: string
description?: string
triggers: readonly TriggerConfig[]
enqueues?: readonly Enqueue[]
virtualEnqueues?: readonly Enqueue[]
virtualSubscribes?: readonly string[]
flows?: readonly string[]
includeFiles?: readonly string[]
}
Unique identifier for the step
Human-readable description
Array of trigger configurations
Topics this step can enqueue to
Flow names for organization
Additional files to bundle
TriggerConfig
Union type of all trigger configurations.
type TriggerConfig =
| QueueTrigger
| ApiTrigger
| CronTrigger
| StateTrigger
| StreamTrigger
QueueTrigger
type QueueTrigger<TSchema extends StepSchemaInput | undefined = any> = {
type: 'queue'
topic: string
input?: TSchema
condition?: TriggerCondition
infrastructure?: Partial<InfrastructureConfig>
}
ApiTrigger
type ApiTrigger<TSchema extends StepSchemaInput | undefined = any> = {
type: 'http'
path: string
method: ApiRouteMethod
bodySchema?: TSchema
responseSchema?: Record<number, StepSchemaInput>
queryParams?: readonly QueryParam[]
middleware?: readonly ApiMiddleware[]
condition?: TriggerCondition
}
CronTrigger
type CronTrigger = {
type: 'cron'
expression: string
condition?: TriggerCondition
}
StateTrigger
type StateTrigger<TSchema extends StepSchemaInput | undefined = any> = {
type: 'state'
condition?: TriggerCondition<InferSchema<TSchema>>
}
StreamTrigger
type StreamTrigger<TSchema extends StepSchemaInput | undefined = any> = {
type: 'stream'
streamName: string
groupId?: string
itemId?: string
condition?: TriggerCondition<InferSchema<TSchema>>
}
Handler types
StepHandler
Generic handler function signature.
type StepHandler<TInput = any, TEnqueueData = never> = (
input: TriggerInput<TInput>,
ctx: FlowContext<TEnqueueData, TriggerInput<TInput>>
) => Promise<ApiResponse | void>
Handlers
Type-safe handler inferred from config.
type Handlers<TConfig extends StepConfig> = (
input: InferHandlerInput<TConfig>,
ctx: FlowContext<InferEnqueues<TConfig>, InferHandlerInput<TConfig>>
) => Promise<ApiResponse | void>
Union of all possible trigger input types.
type TriggerInput<T> =
| T // Queue trigger
| ApiRequest<T> // HTTP trigger
| undefined // Cron trigger
| StateTriggerInput<T> // State trigger
| StreamTriggerInput<T> // Stream trigger
Request and response types
ApiRequest
HTTP request interface.
interface ApiRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
}
ApiResponse
HTTP response object.
type ApiResponse<TStatus extends number = number, TBody = any> = {
status: TStatus
headers?: Record<string, string>
body: TBody
}
Example
return {
status: 200,
headers: { 'content-type': 'application/json' },
body: { success: true, data: result },
}
MotiaHttpArgs
Low-level HTTP interface with streaming support.
interface MotiaHttpArgs<TBody = unknown> {
request: MotiaHttpRequest<TBody>
response: MotiaHttpResponse
}
interface MotiaHttpRequest<TBody = unknown> {
pathParams: Record<string, string>
queryParams: Record<string, string | string[]>
body: TBody
headers: Record<string, string | string[]>
method: string
requestBody: ChannelReader
}
interface MotiaHttpResponse {
status: (statusCode: number) => void
headers: (headers: Record<string, string>) => void
stream: NodeJS.WritableStream
close: () => void
}
Input for state change triggers.
type StateTriggerInput<T> = {
type: 'state'
group_id: string
item_id: string
old_value?: T
new_value?: T
}
Example
import type { StateTriggerInput } from 'motia'
interface Order {
id: string
status: string
}
const condition = (input: StateTriggerInput<Order>) => {
return (
input.group_id === 'orders' &&
input.old_value?.status !== 'shipped' &&
input.new_value?.status === 'shipped'
)
}
Input for stream event triggers.
type StreamTriggerInput<T> = {
type: 'stream'
timestamp: number
streamName: string
groupId: string
id: string
event: StreamEvent<T>
}
type StreamEvent<T> =
| { type: 'create', data: T }
| { type: 'update', data: T }
| { type: 'delete', data: T }
Example
import type { StreamTriggerInput } from 'motia'
interface Progress {
completed: number
total: number
}
const condition = (input: StreamTriggerInput<Progress>) => {
return (
input.event.type === 'update' &&
input.event.data.completed === input.event.data.total
)
}
Context types
FlowContext
See FlowContext API for detailed documentation.
interface FlowContext<TEnqueueData = never, TInput = unknown> {
enqueue: Enqueuer<TEnqueueData>
traceId: string
state: InternalStateManager
logger: Logger
streams: Streams
trigger: TriggerInfo
is: TriggerTypeGuards<TInput>
getData: () => ExtractDataPayload<TInput>
match: <TResult>(handlers: MatchHandlers<TInput, TEnqueueData, TResult>) => Promise<TResult | undefined>
}
TriggerInfo
Metadata about the current trigger.
type TriggerInfo = {
type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
index?: number
path?: string
method?: string
topic?: string
expression?: string
}
Enqueuer
Function type for enqueueing events.
type Enqueuer<TData> = (event: {
topic: string
data: TData
messageGroupId?: string
}) => Promise<void>
Schema types
StepSchemaInput
Accepted schema types for validation.
type StepSchemaInput = ZodType | JsonSchema | TypedJsonSchema
TypedJsonSchema
JSON Schema with TypeScript type inference.
type TypedJsonSchema<T = unknown> = JsonSchema & {
readonly __phantomType?: T
}
Example
import { jsonSchema } from 'motia'
import { z } from 'zod'
const schema = jsonSchema(
z.object({
name: z.string(),
age: z.number(),
})
)
// TypeScript knows the inferred type
type User = z.infer<typeof schema>
Infrastructure types
InfrastructureConfig
Configuration for handler and queue infrastructure.
type InfrastructureConfig = {
handler?: Partial<HandlerConfig>
queue?: Partial<QueueConfig>
}
type HandlerConfig = {
ram: number // Memory in MB
cpu?: number // CPU units
timeout: number // Timeout in seconds
}
type QueueConfig = {
type: 'fifo' | 'standard'
maxRetries: number
visibilityTimeout: number
delaySeconds: number
}
Example
import { queue } from 'motia'
const trigger = queue('heavy-processing', {
input: dataSchema,
infrastructure: {
handler: {
ram: 2048,
timeout: 300,
},
queue: {
type: 'fifo',
maxRetries: 3,
visibilityTimeout: 120,
},
},
})
Stream types
StreamConfig
Configuration for defining a stream.
interface StreamConfig {
name: string
schema: StepSchemaInput
baseConfig: { storageType: 'default' }
onJoin?: (
subscription: StreamSubscription,
context: FlowContext,
authContext?: StreamContext
) => Promise<StreamJoinResult>
onLeave?: (
subscription: StreamSubscription,
context: FlowContext,
authContext?: StreamContext
) => Promise<void>
}
type StreamSubscription = {
groupId: string
id?: string
}
StreamSetResult
Result from set/update operations.
type StreamSetResult<T> = {
value: T
version: number
timestamp: number
}
UpdateOp
JSON Patch operations for updates.
type UpdateOp =
| { op: 'add', path: string, value: any }
| { op: 'remove', path: string }
| { op: 'replace', path: string, value: any }
| { op: 'move', from: string, path: string }
| { op: 'copy', from: string, path: string }
| { op: 'test', path: string, value: any }
Utility types
TriggerCondition
Function type for conditional trigger execution.
type TriggerCondition<TInput = unknown> = (
input: TriggerInput<TInput>,
ctx: FlowContext<never, TriggerInput<TInput>>
) => boolean | Promise<boolean>
Enqueue
Enqueue configuration.
type Enqueue = string | {
topic: string
label?: string
conditional?: boolean
}
ApiRouteMethod
Supported HTTP methods.
type ApiRouteMethod =
| 'GET'
| 'POST'
| 'PUT'
| 'DELETE'
| 'PATCH'
| 'OPTIONS'
| 'HEAD'
QueryParam
Query parameter definition.
interface QueryParam {
name: string
description: string
}
Type inference
Motia provides automatic type inference from configurations:
import { step, http, queue } from 'motia'
import { z } from 'zod'
const dataSchema = z.object({
userId: z.string(),
amount: z.number(),
})
const config = {
name: 'ProcessPayment',
triggers: [
http('POST', '/payments', { bodySchema: dataSchema }),
queue('payments', { input: dataSchema }),
],
enqueues: ['notifications'],
} as const satisfies StepConfig
// Handler input type is automatically inferred
export const { handler } = step(config, async (input, ctx) => {
// TypeScript knows the shape of input based on triggers
if (ctx.is.http(input)) {
// input.body: { userId: string, amount: number }
}
if (ctx.is.queue(input)) {
// input: { userId: string, amount: number }
}
// ctx.enqueue is type-checked against 'notifications'
await ctx.enqueue({
topic: 'notifications',
data: { message: 'Payment processed' },
})
})