Processing queue events
Create a background worker by defining a Step with a queue trigger:// steps/process-greeting.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const inputSchema = z.object({
timestamp: z.string(),
appName: z.string(),
greetingPrefix: z.string(),
requestId: z.string(),
})
export const config = {
name: 'ProcessGreeting',
description: 'Processes greeting in the background',
triggers: [
{
type: 'queue',
topic: 'process-greeting',
input: inputSchema,
},
],
enqueues: [],
flows: ['hello-world-flow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, state }) => {
const { timestamp, appName, greetingPrefix, requestId } = input
logger.info('Processing greeting', { requestId, appName })
const greeting = `${greetingPrefix} ${appName}!`
await state.set('greetings', requestId, {
greeting,
processedAt: new Date().toISOString(),
originalTimestamp: timestamp,
})
logger.info('Greeting processed successfully', {
requestId,
greeting,
storedInState: true,
})
}
Building an event-driven workflow
Connect multiple Steps together with queue events:Create the API endpoint
Start with an HTTP endpoint that enqueues work:
// steps/api.step.ts
export const config = {
name: 'CreateOrder',
triggers: [
{
type: 'http',
method: 'POST',
path: '/orders',
bodySchema: z.object({
pet: z.object({ name: z.string(), photoUrl: z.string() }),
foodOrder: z.object({ quantity: z.number() }).optional(),
}),
},
],
enqueues: ['process-food-order'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (request, { enqueue, logger }) => {
const { pet, foodOrder } = request.body
const newPet = await petStoreService.createPet(pet)
if (foodOrder) {
await enqueue({
topic: 'process-food-order',
data: {
quantity: foodOrder.quantity,
email: '[email protected]',
petId: newPet.id,
},
})
}
return { status: 200, body: newPet }
}
Process the order
Create a worker to process orders:
// steps/process-order.step.ts
import { queue } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
email: z.string(),
quantity: z.number(),
petId: z.string(),
})
export const config = {
name: 'ProcessFoodOrder',
description: 'Process food orders in the background',
flows: ['pet-store'],
triggers: [queue('process-food-order', { input: orderSchema })],
enqueues: ['notification'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue }) => {
logger.info('Processing food order', { input })
const order = await petStoreService.createOrder({
...input,
shipDate: new Date().toISOString(),
status: 'placed',
})
await state.set('orders', order.id, order)
await enqueue({
topic: 'notification',
data: {
email: input.email,
templateId: 'new-order',
templateData: {
orderId: order.id,
quantity: order.quantity,
status: order.status,
},
},
})
}
Send notifications
Create a final Step to send notifications:
// steps/notification.step.ts
import { queue, jsonSchema } from 'motia'
import { z } from 'zod'
export const config = {
name: 'Notification',
description: 'Sends notifications to users',
triggers: [
queue('notification', {
input: jsonSchema(
z.object({
templateId: z.string(),
email: z.string(),
templateData: z.record(z.string(), z.any()),
}),
),
}),
],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger }) => {
const { email, templateId, templateData } = input
logger.info('Sending notification', {
email: email.replace(/(?<=.{2}).(?=.*@)/g, '*'),
templateId,
})
// Send email via your email service
// await emailService.send({ email, templateId, templateData })
}
Scheduled jobs with cron
Run jobs on a schedule using cron triggers:// steps/periodic-job.step.ts
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'HandlePeriodicJob',
description: 'Runs every minute',
triggers: [
{
type: 'cron',
expression: '* * * * *', // Every minute
},
],
enqueues: ['periodic-job-handled'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_input, { logger, enqueue }) => {
logger.info('Periodic job executed')
await enqueue({
topic: 'periodic-job-handled',
data: {
message: 'Periodic job executed',
timestamp: new Date().toISOString(),
},
})
}
Multi-trigger Steps
Create Steps that can be triggered by multiple sources:import { http, queue, step } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
email: z.string(),
quantity: z.number(),
petId: z.string(),
})
export const stepConfig = {
name: 'ProcessOrder',
triggers: [
queue('process-order', { input: orderSchema }),
http('POST', '/process-order', { bodySchema: orderSchema }),
],
enqueues: ['notification'],
}
export const { config, handler } = step(stepConfig, async (_input, ctx) => {
const data = ctx.getData()
ctx.logger.info('Processing order', {
triggerType: ctx.trigger.type,
data,
})
const order = await processOrder(data)
return ctx.match({
http: async () => ({
status: 200,
body: { success: true, order },
}),
})
})
Batch processing with cron
Process batches of items on a schedule:export const config = {
name: 'BatchProcessor',
triggers: [
{
type: 'cron',
expression: '0 */6 * * *', // Every 6 hours
},
],
enqueues: ['process-item'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, { logger, state, enqueue }) => {
logger.info('Starting batch processing')
const pendingItems = await state.list('pending-items')
logger.info(`Found ${pendingItems.length} pending items`)
for (const item of pendingItems) {
await enqueue({
topic: 'process-item',
data: item,
})
}
logger.info('Batch enqueued successfully')
}
Job retries and error handling
Handle failures gracefully:export const handler: Handlers<typeof config> = async (input, { logger, state }) => {
try {
logger.info('Processing job', { input })
await processData(input)
await state.set('jobs', input.id, {
status: 'completed',
completedAt: new Date().toISOString(),
})
} catch (error) {
logger.error('Job processing failed', { error, input })
await state.set('jobs', input.id, {
status: 'failed',
error: error.message,
failedAt: new Date().toISOString(),
})
throw error // Re-throw to trigger retry
}
}
Queue strategies
Configure queue behavior iniii-config.yaml:
queues:
process-order:
strategy: fifo
maxRetries: 3
retryDelay: 1000
send-email:
strategy: priority
maxConcurrency: 10
Related concepts
Queue triggers
Learn about queue trigger configuration
Cron triggers
Schedule recurring jobs
Workflows
Build complex multi-step workflows
State management
Store job state and results