Workflows
Motia enables you to build complex workflows by orchestrating multiple steps through events, state, and streams. Workflows can execute in parallel, sequentially, or conditionally based on your business logic.Event-Driven Orchestration
Steps communicate through events using theenqueue function:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
email: z.string(),
productId: z.string(),
quantity: z.number(),
})
export const config = {
name: 'CreateOrder',
triggers: [{ type: 'http', method: 'POST', path: '/orders', bodySchema: orderSchema }],
enqueues: ['order.process', 'notification.send'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state, logger }) => {
const order = {
id: crypto.randomUUID(),
...request.body,
status: 'created',
createdAt: new Date().toISOString(),
}
await state.set('orders', order.id, order)
logger.info('Order created', { orderId: order.id })
// Trigger downstream steps
await enqueue({
topic: 'order.process',
data: { orderId: order.id },
})
await enqueue({
topic: 'notification.send',
data: {
email: order.email,
template: 'order-confirmation',
data: order,
},
})
return {
status: 201,
body: { order },
}
}
Sequential Workflows
Chain steps together for sequential processing:// Step 1: Validate Payment
export const validatePaymentConfig = {
name: 'ValidatePayment',
triggers: [{ type: 'queue', topic: 'order.process' }],
enqueues: ['inventory.reserve'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const validatePaymentHandler: Handlers<typeof validatePaymentConfig> = async (
input,
{ enqueue, state, logger }
) => {
const { orderId } = input
const order = await state.get('orders', orderId)
logger.info('Validating payment', { orderId })
// Simulate payment validation
const paymentValid = true // Call payment provider
if (paymentValid) {
await state.update('orders', orderId, [
{ type: 'set', path: 'paymentStatus', value: 'validated' },
])
// Continue to next step
await enqueue({
topic: 'inventory.reserve',
data: { orderId },
})
} else {
await state.update('orders', orderId, [
{ type: 'set', path: 'status', value: 'payment-failed' },
])
}
}
// Step 2: Reserve Inventory
export const reserveInventoryConfig = {
name: 'ReserveInventory',
triggers: [{ type: 'queue', topic: 'inventory.reserve' }],
enqueues: ['shipping.create'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const reserveInventoryHandler: Handlers<typeof reserveInventoryConfig> = async (
input,
{ enqueue, state, logger }
) => {
const { orderId } = input
const order = await state.get('orders', orderId)
logger.info('Reserving inventory', { orderId })
// Reserve inventory
const reserved = true // Check inventory system
if (reserved) {
await state.update('orders', orderId, [
{ type: 'set', path: 'inventoryStatus', value: 'reserved' },
])
// Continue to shipping
await enqueue({
topic: 'shipping.create',
data: { orderId },
})
}
}
// Step 3: Create Shipment
export const createShipmentConfig = {
name: 'CreateShipment',
triggers: [{ type: 'queue', topic: 'shipping.create' }],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const createShipmentHandler: Handlers<typeof createShipmentConfig> = async (
input,
{ state, logger }
) => {
const { orderId } = input
logger.info('Creating shipment', { orderId })
await state.update('orders', orderId, [
{ type: 'set', path: 'status', value: 'shipped' },
{ type: 'set', path: 'shippedAt', value: new Date().toISOString() },
])
logger.info('Order workflow completed', { orderId })
}
Parallel Execution
Execute multiple steps in parallel for better performance:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'ParallelDataGathering',
triggers: [{ type: 'http', method: 'POST', path: '/analyze/:userId' }],
enqueues: ['fetch.profile', 'fetch.activity', 'fetch.preferences'],
flows: ['data-aggregation'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state }) => {
const { userId } = request.params
const analysisId = crypto.randomUUID()
// Initialize aggregation state
await state.set('analysis', analysisId, {
userId,
status: 'gathering',
results: {},
startedAt: Date.now(),
})
// Trigger parallel data fetching
await Promise.all([
enqueue({
topic: 'fetch.profile',
data: { analysisId, userId },
}),
enqueue({
topic: 'fetch.activity',
data: { analysisId, userId },
}),
enqueue({
topic: 'fetch.preferences',
data: { analysisId, userId },
}),
])
return {
status: 202,
body: { analysisId, status: 'gathering' },
}
}
Parallel Merge Pattern
import type { Handlers, StepConfig } from 'motia'
// Individual data fetcher
export const fetchProfileConfig = {
name: 'FetchProfile',
triggers: [{ type: 'queue', topic: 'fetch.profile' }],
enqueues: ['analysis.aggregate'],
flows: ['data-aggregation'],
} as const satisfies StepConfig
export const fetchProfileHandler: Handlers<typeof fetchProfileConfig> = async (
input,
{ enqueue, state }
) => {
const { analysisId, userId } = input
// Fetch profile data
const profile = { name: 'John Doe', age: 30 } // From database
// Store partial result
await state.update('analysis', analysisId, [
{ type: 'set', path: 'results.profile', value: profile },
{ type: 'increment', path: 'completedTasks', by: 1 },
])
// Notify aggregator
await enqueue({
topic: 'analysis.aggregate',
data: { analysisId },
})
}
// Aggregation step (triggered by state)
export const aggregateConfig = {
name: 'AggregateResults',
triggers: [
{
type: 'state',
scope: 'analysis',
condition: (input) => {
return (
input.event.type === 'update' &&
input.event.data.completedTasks === 3
)
},
},
],
flows: ['data-aggregation'],
} as const satisfies StepConfig
export const aggregateHandler: Handlers<typeof aggregateConfig> = async (input, { state, logger }) => {
const analysis = input.event.data
const duration = Date.now() - analysis.startedAt
logger.info('All parallel tasks completed', {
analysisId: input.id,
duration,
results: analysis.results,
})
await state.update('analysis', input.id, [
{ type: 'set', path: 'status', value: 'completed' },
{ type: 'set', path: 'completedAt', value: Date.now() },
])
}
Conditional Workflows
Route workflows based on conditions:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'ProcessTransaction',
triggers: [{ type: 'queue', topic: 'transaction.created' }],
enqueues: ['fraud.check', 'payment.process', 'transaction.reject'],
flows: ['payment-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
const { transactionId, amount, userId } = input
logger.info('Processing transaction', { transactionId, amount })
// High-value transactions require fraud check
if (amount > 10000) {
logger.info('High-value transaction, initiating fraud check', { transactionId })
await state.set('transactions', transactionId, {
id: transactionId,
status: 'fraud-check',
amount,
userId,
})
await enqueue({
topic: 'fraud.check',
data: { transactionId, amount, userId },
})
} else {
// Direct to payment processing
logger.info('Standard transaction, processing payment', { transactionId })
await state.set('transactions', transactionId, {
id: transactionId,
status: 'processing',
amount,
userId,
})
await enqueue({
topic: 'payment.process',
data: { transactionId },
})
}
}
Multi-Path Routing
export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
const { orderType, priority, value } = input
switch (orderType) {
case 'express':
await enqueue({ topic: 'express.fulfillment', data: input })
break
case 'standard':
if (priority === 'high' || value > 1000) {
await enqueue({ topic: 'priority.queue', data: input })
} else {
await enqueue({ topic: 'standard.queue', data: input })
}
break
case 'backorder':
await enqueue({ topic: 'inventory.notify', data: input })
break
default:
logger.warn('Unknown order type', { orderType })
await enqueue({ topic: 'manual.review', data: input })
}
}
Multi-Trigger Steps
Create versatile steps that respond to multiple trigger types:import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
amount: z.number(),
description: z.string(),
})
export const config = {
name: 'ProcessOrder',
triggers: [
// API trigger for manual orders
{
type: 'http',
method: 'POST',
path: '/orders/manual',
bodySchema: orderSchema,
},
// Queue trigger for automated orders
{
type: 'queue',
topic: 'order.created',
input: orderSchema,
},
// Cron trigger for batch processing
{
type: 'cron',
expression: '0 2 * * *', // Daily at 2 AM
},
],
enqueues: ['order.processed'],
flows: ['order-workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, ctx) => {
const orderId = crypto.randomUUID()
return ctx.match({
http: async ({ request }) => {
const order = {
id: orderId,
...request.body,
source: 'manual-api',
createdAt: new Date().toISOString(),
}
await ctx.state.set('orders', orderId, order)
await ctx.enqueue({ topic: 'order.processed', data: { orderId } })
return {
status: 200,
body: { message: 'Order processed', orderId },
}
},
queue: async (queueInput) => {
const order = {
id: orderId,
...queueInput,
source: 'event',
createdAt: new Date().toISOString(),
}
await ctx.state.set('orders', orderId, order)
await ctx.enqueue({ topic: 'order.processed', data: { orderId } })
},
cron: async () => {
ctx.logger.info('Processing batch orders')
const pendingOrders = await ctx.state.list<{ id: string }>('pending-orders')
for (const order of pendingOrders) {
await ctx.enqueue({
topic: 'order.processed',
data: { orderId: order.id },
})
}
ctx.logger.info('Batch processing complete', { count: pendingOrders.length })
},
})
}
Long-Running Workflows
Handle workflows that span hours or days:import type { Handlers, StepConfig } from 'motia'
// Start long-running process
export const startConfig = {
name: 'StartApprovalProcess',
triggers: [{ type: 'http', method: 'POST', path: '/approvals' }],
enqueues: ['approval.reminder'],
flows: ['approval-workflow'],
} as const satisfies StepConfig
export const startHandler: Handlers<typeof startConfig> = async (
{ request },
{ enqueue, state }
) => {
const approvalId = crypto.randomUUID()
await state.set('approvals', approvalId, {
id: approvalId,
status: 'pending',
requestedAt: new Date().toISOString(),
...request.body,
})
// Schedule reminder for 24 hours later
await enqueue({
topic: 'approval.reminder',
data: { approvalId },
delay: 24 * 60 * 60 * 1000, // 24 hours
})
return { status: 202, body: { approvalId, status: 'pending' } }
}
// Reminder step (triggered after delay)
export const reminderConfig = {
name: 'ApprovalReminder',
triggers: [{ type: 'queue', topic: 'approval.reminder' }],
enqueues: ['notification.send'],
flows: ['approval-workflow'],
} as const satisfies StepConfig
export const reminderHandler: Handlers<typeof reminderConfig> = async (input, { state, enqueue }) => {
const { approvalId } = input
const approval = await state.get('approvals', approvalId)
if (approval?.status === 'pending') {
await enqueue({
topic: 'notification.send',
data: {
email: approval.approver,
template: 'approval-reminder',
data: approval,
},
})
}
}
Error Handling & Retries
Motia automatically retries failed steps (configurable inconfig.yaml):
export const handler: Handlers<typeof config> = async (input, { logger, enqueue }) => {
try {
// Attempt external API call
const result = await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(input),
})
if (!result.ok) {
throw new Error(`API error: ${result.status}`)
}
logger.info('Processing successful')
} catch (error) {
logger.error('Processing failed, will retry', { error })
// Throw to trigger automatic retry
throw error
}
}
Dead Letter Queue
export const config = {
name: 'ProcessWithDLQ',
triggers: [{ type: 'queue', topic: 'data.process' }],
enqueues: ['data.failed'],
flows: ['data-processing'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, enqueue, state }) => {
try {
// Process data
await processData(input)
} catch (error) {
const retryCount = input.retryCount || 0
if (retryCount >= 3) {
// Move to dead letter queue after max retries
logger.error('Max retries exceeded, sending to DLQ', { error, input })
await state.set('failed-jobs', crypto.randomUUID(), {
input,
error: String(error),
failedAt: new Date().toISOString(),
})
await enqueue({
topic: 'data.failed',
data: { input, error: String(error) },
})
} else {
// Retry with incremented count
throw error
}
}
}
Workflow Monitoring
Track workflow progress in real-time:export const handler: Handlers<typeof config> = async ({ request }, { state, streams }) => {
const workflowId = crypto.randomUUID()
// Initialize workflow state
await state.set('workflows', workflowId, {
id: workflowId,
status: 'running',
steps: {
step1: 'pending',
step2: 'pending',
step3: 'pending',
},
startedAt: Date.now(),
})
// Also set in stream for real-time updates
await streams.workflows.set('active', workflowId, {
id: workflowId,
status: 'running',
progress: 0,
})
return { status: 202, body: { workflowId } }
}
// Update progress as steps complete
export const stepHandler: Handlers<typeof stepConfig> = async (input, { state, streams }) => {
const { workflowId, stepName } = input
// Update state
await state.update('workflows', workflowId, [
{ type: 'set', path: `steps.${stepName}`, value: 'completed' },
])
// Update stream for real-time dashboard
await streams.workflows.update('active', workflowId, [
{ type: 'increment', path: 'progress', by: 33 },
])
}
Best Practices
1. Design for Idempotency
export const handler: Handlers<typeof config> = async (input, { state }) => {
const { orderId } = input
// Check if already processed
const existing = await state.get('processed', orderId)
if (existing) {
logger.info('Already processed, skipping', { orderId })
return
}
// Process order
await processOrder(orderId)
// Mark as processed
await state.set('processed', orderId, { processedAt: Date.now() })
}
2. Use Meaningful Topic Names
// Good: Clear, hierarchical
await enqueue({ topic: 'user.created', data })
await enqueue({ topic: 'order.payment.validated', data })
await enqueue({ topic: 'inventory.stock.low', data })
// Avoid: Vague, flat
await enqueue({ topic: 'process', data })
await enqueue({ topic: 'data', data })
3. Document Workflow Flows
export const config = {
name: 'ProcessOrder',
description: 'Main order processing step. Validates payment, reserves inventory, creates shipment.',
flows: ['order-workflow'],
enqueues: ['payment.validate', 'inventory.reserve'],
// ...
}
Next Steps
- Learn about State Management for workflow data
- Explore Streaming for real-time workflow updates
- Check out Observability for workflow monitoring