Simple workflow pattern
The most basic workflow connects an API endpoint to a background worker:// Step 1: API receives request
export const config = {
name: 'SendMessage',
triggers: [
{
type: 'http',
method: 'POST',
path: '/messages',
}
],
enqueues: ['message.sent']
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (req, { enqueue }) => {
await enqueue({
topic: 'message.sent',
data: { text: req.body.text }
})
return { status: 200, body: { ok: true } }
}
// Step 2: Worker processes message
export const config = {
name: 'ProcessMessage',
triggers: [
{
type: 'queue',
topic: 'message.sent',
}
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger }) => {
logger.info('Processing message', input)
}
Multi-step workflow
Build complex workflows by chaining multiple Steps:Step 1: Create the order
Start with an API endpoint that creates an order:
// steps/create-order.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'CreateOrder',
triggers: [
{
type: 'http',
method: 'POST',
path: '/orders',
bodySchema: z.object({
pet: z.object({ name: z.string(), photoUrl: z.string() }),
foodOrder: z.object({ quantity: z.number() }),
}),
},
],
enqueues: ['order.created'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (request, { enqueue, logger, traceId }) => {
const { pet, foodOrder } = request.body
logger.info('Creating order', { pet, foodOrder, traceId })
const petRecord = await createPet(pet)
await enqueue({
topic: 'order.created',
data: {
petId: petRecord.id,
quantity: foodOrder.quantity,
email: '[email protected]',
},
})
return { status: 200, body: petRecord }
}
Step 2: Process the order
Process the order and create a shipping record:
// steps/process-order.step.ts
import { queue } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
petId: z.string(),
quantity: z.number(),
email: z.string(),
})
export const config = {
name: 'ProcessOrder',
triggers: [queue('order.created', { input: orderSchema })],
enqueues: ['order.processed'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue, traceId }) => {
logger.info('Processing order', { input, traceId })
const order = await createOrder({
...input,
shipDate: new Date().toISOString(),
status: 'placed',
})
await state.set('orders', order.id, order)
await enqueue({
topic: 'order.processed',
data: {
orderId: order.id,
email: input.email,
status: order.status,
},
})
}
Step 3: Send notification
Notify the customer that their order was processed:
// steps/send-notification.step.ts
import { queue, jsonSchema } from 'motia'
import { z } from 'zod'
export const config = {
name: 'SendNotification',
triggers: [
queue('order.processed', {
input: jsonSchema(
z.object({
orderId: z.string(),
email: z.string(),
status: z.string(),
}),
),
}),
],
enqueues: [],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, traceId }) => {
logger.info('Sending notification', {
orderId: input.orderId,
email: input.email.replace(/(?<=.{2}).(?=.*@)/g, '*'),
traceId,
})
// Send email notification
await emailService.send({
to: input.email,
template: 'order-confirmation',
data: {
orderId: input.orderId,
status: input.status,
},
})
}
Conditional workflows
Use trigger conditions to create branching logic:import type { TriggerCondition } from 'motia'
const isHighValue: TriggerCondition<{ amount: number }> = (input) => {
return input.amount > 1000
}
const isVerifiedUser: TriggerCondition<ApiRequest<{ user: { verified: boolean } }>> = (input, ctx) => {
if (ctx.trigger.type !== 'http') return false
return input.body.user.verified === true
}
export const config = {
name: 'ProcessOrder',
triggers: [
{
type: 'queue',
topic: 'order.created',
input: z.object({ amount: z.number() }),
condition: isHighValue, // Only process high-value orders
},
{
type: 'http',
method: 'POST',
path: '/orders/manual',
condition: isVerifiedUser, // Only allow verified users
},
],
enqueues: ['order.processed'],
} as const satisfies StepConfig
Multi-trigger workflows
Handle different trigger types with pattern matching:export const config = {
name: 'MultiTriggerOrder',
triggers: [
queue('order.created', { input: orderSchema }),
http('POST', '/orders/manual', { bodySchema: orderSchema }),
{
type: 'cron',
expression: '0 */6 * * *', // Every 6 hours
},
],
enqueues: ['order.processed'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, ctx) => {
return ctx.match({
http: async ({ request }) => {
ctx.logger.info('Processing manual order', { body: request.body })
const order = await processOrder(request.body)
await ctx.enqueue({
topic: 'order.processed',
data: { orderId: order.id, source: 'manual' },
})
return { status: 200, body: { order } }
},
queue: async (input) => {
ctx.logger.info('Processing queued order', { input })
const order = await processOrder(input)
await ctx.enqueue({
topic: 'order.processed',
data: { orderId: order.id, source: 'queue' },
})
},
cron: async () => {
ctx.logger.info('Processing batch orders')
const pendingOrders = await ctx.state.list('pending-orders')
for (const order of pendingOrders) {
await ctx.enqueue({
topic: 'order.created',
data: order,
})
}
},
})
}
Parallel workflows
Enqueue multiple events to process work in parallel:export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
logger.info('Starting parallel workflow')
// Enqueue multiple tasks to run in parallel
await Promise.all([
enqueue({ topic: 'process-payment', data: { orderId: input.orderId } }),
enqueue({ topic: 'check-inventory', data: { items: input.items } }),
enqueue({ topic: 'send-confirmation', data: { email: input.email } }),
])
logger.info('All tasks enqueued')
}
State-driven workflows
Use state triggers to react to state changes:export const config = {
name: 'OnOrderStateChange',
triggers: [
{
type: 'state',
namespace: 'orders',
event: 'set',
},
],
enqueues: ['order.status.changed'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
const { key, new_value, old_value } = input
if (old_value?.status !== new_value?.status) {
logger.info('Order status changed', {
orderId: key,
oldStatus: old_value?.status,
newStatus: new_value?.status,
})
await enqueue({
topic: 'order.status.changed',
data: {
orderId: key,
status: new_value.status,
},
})
}
}
Workflow observability
Track workflows across Steps usingtraceId:
export const handler: Handlers<typeof config> = async (input, { logger, traceId, enqueue }) => {
logger.info('Step started', { traceId, input })
// The traceId is automatically propagated to enqueued events
await enqueue({
topic: 'next-step',
data: input,
})
logger.info('Step completed', { traceId })
}
Error handling in workflows
Handle errors at each Step:export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue }) => {
try {
const result = await processOrder(input)
await state.set('orders', result.id, {
...result,
status: 'completed',
})
await enqueue({
topic: 'order.completed',
data: result,
})
} catch (error) {
logger.error('Order processing failed', { error, input })
await state.set('orders', input.orderId, {
status: 'failed',
error: error.message,
})
await enqueue({
topic: 'order.failed',
data: { orderId: input.orderId, error: error.message },
})
}
}
Related concepts
Queue triggers
Learn about queue trigger configuration
Background jobs
Process work asynchronously
State triggers
React to state changes
Context API
Access workflow context