Observability
Motia provides built-in observability through OpenTelemetry integration, giving you full visibility into your application with distributed tracing, structured logging, and metrics.Distributed Tracing
Every step execution generates a complete trace automatically:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'ProcessOrder',
triggers: [{ type: 'http', method: 'POST', path: '/orders' }],
enqueues: ['order.fulfill'],
flows: ['orders'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { logger, enqueue, state }) => {
const orderId = crypto.randomUUID()
logger.info('Processing order', { orderId })
// State operations are automatically traced
await state.set('orders', orderId, {
id: orderId,
...request.body,
status: 'processing',
})
// Enqueue operations are traced
await enqueue({
topic: 'order.fulfill',
data: { orderId },
})
logger.info('Order created', { orderId })
return { status: 201, body: { orderId } }
}
- HTTP request span
- Step execution span
- State operations (set, get, update, delete)
- Enqueue operations
- Log entries correlated by trace ID
Structured Logging
Use the built-in logger for structured, searchable logs:export const handler: Handlers<typeof config> = async ({ request }, { logger, traceId }) => {
const { userId, action } = request.body
// Info level logging
logger.info('User action received', {
userId,
action,
timestamp: Date.now(),
})
try {
const result = await performAction(action)
logger.info('Action completed successfully', {
userId,
action,
result,
})
return { status: 200, body: { result } }
} catch (error) {
// Error logging with context
logger.error('Action failed', {
userId,
action,
error: error.message,
stack: error.stack,
})
return { status: 500, body: { error: 'Action failed' } }
}
}
Log Levels
export const handler: Handlers<typeof config> = async (_, { logger }) => {
// Trace: Very detailed debugging
logger.trace('Entering function', { step: 'validation' })
// Debug: Detailed debugging information
logger.debug('Processing data', { itemCount: 42 })
// Info: General informational messages
logger.info('Operation started', { operationId: '123' })
// Warn: Warning messages for potentially harmful situations
logger.warn('Rate limit approaching', { currentRate: 95, limit: 100 })
// Error: Error events that might still allow the application to continue
logger.error('Failed to fetch data', { error: 'timeout' })
}
Trace Correlation
All operations within a trace are automatically correlated:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'StartWorkflow',
triggers: [{ type: 'http', method: 'POST', path: '/workflow' }],
enqueues: ['step.one', 'step.two'],
flows: ['workflow'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { logger, enqueue, traceId }) => {
logger.info('Workflow started', { traceId })
// These enqueued steps will share the same trace ID
await Promise.all([
enqueue({ topic: 'step.one', data: { traceId } }),
enqueue({ topic: 'step.two', data: { traceId } }),
])
return { status: 202, body: { traceId } }
}
// Downstream step - automatically part of the same trace
export const stepOneHandler: Handlers<typeof stepOneConfig> = async (input, { logger, traceId }) => {
logger.info('Executing step one', { traceId })
// This log will be correlated with the parent trace
}
Metrics
Motia automatically collects performance metrics:Built-in Metrics
// Access metrics via the iii Console or API
{
"engine_metrics": {
"invocations": {
"total": 1547,
"success": 1523,
"error": 24,
"deferred": 0,
"by_function": {
"ProcessOrder": 856,
"SendNotification": 691
}
},
"workers": {
"spawns": 12,
"deaths": 0,
"active": 12
},
"performance": {
"avg_duration_ms": 125.4,
"p50_duration_ms": 98.2,
"p95_duration_ms": 287.5,
"p99_duration_ms": 456.1,
"min_duration_ms": 12.3,
"max_duration_ms": 1234.5
}
}
}
Custom Metrics
Track custom application metrics:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'TrackBusinessMetrics',
triggers: [{ type: 'queue', topic: 'order.completed' }],
flows: ['metrics'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { state, logger }) => {
const { orderId, amount, category } = input
// Track revenue by category
await state.update('metrics', `revenue-${category}`, [
{ type: 'increment', path: 'totalAmount', by: amount },
{ type: 'increment', path: 'orderCount', by: 1 },
])
// Track daily totals
const today = new Date().toISOString().split('T')[0]
await state.update('metrics', `daily-${today}`, [
{ type: 'increment', path: 'revenue', by: amount },
{ type: 'increment', path: 'orders', by: 1 },
])
logger.info('Business metrics updated', {
category,
amount,
orderId,
})
}
OpenTelemetry Configuration
Configure OTEL settings inconfig.yaml:
modules:
otel:
enabled: true
service_name: motia-app
service_namespace: production
service_version: 1.0.0
# Exporter configuration
exporter: both # Options: otlp, memory, both
otlp:
endpoint: http://localhost:4317
protocol: grpc # or http
# Sampling configuration
sampling:
default: 1.0 # 100% sampling
parent_based: true
rules:
- operation: "GET /health"
rate: 0.1 # Sample 10% of health checks
- operation: "ProcessOrder"
rate: 1.0 # Always sample orders
# Logs sampling
logs_sampling_ratio: 1.0
# Resource attributes
resource_attributes:
deployment.environment: production
service.instance.id: server-01
iii Console
The built-in iii Console provides real-time observability:Traces View
# Start your application
iii
# Console available at: http://localhost:3000
- Complete trace timelines
- Span hierarchy and relationships
- State operations per trace
- Enqueue operations
- Error details and stack traces
Logs View
Filter and search structured logs:// All logs are automatically indexed and searchable
logger.info('User login', {
userId: 'user-123',
method: 'oauth',
provider: 'google',
})
// Search in Console:
// - By trace ID
// - By severity (ERROR, WARN, INFO, DEBUG)
// - By time range
// - By custom attributes (userId, method, etc.)
State Inspector
View current state in real-time:// State operations are visible in the console
await state.set('users', userId, userData)
await state.update('orders', orderId, [
{ type: 'increment', path: 'count', by: 1 },
])
// Inspect in Console:
// - All scopes and keys
// - Current values
// - Update history
// - Related traces
Monitoring Workflows
Track multi-step workflows:import type { Handlers, StepConfig } from 'motia'
// Step 1: Start workflow
export const startConfig = {
name: 'StartDataPipeline',
triggers: [{ type: 'http', method: 'POST', path: '/pipeline' }],
enqueues: ['pipeline.extract', 'pipeline.transform'],
flows: ['data-pipeline'],
} as const satisfies StepConfig
export const startHandler: Handlers<typeof startConfig> = async (
{ request },
{ logger, enqueue, state, traceId }
) => {
const pipelineId = crypto.randomUUID()
logger.info('Starting data pipeline', {
pipelineId,
traceId,
source: request.body.source,
})
// Initialize pipeline tracking
await state.set('pipelines', pipelineId, {
id: pipelineId,
traceId,
status: 'running',
steps: {
extract: 'pending',
transform: 'pending',
load: 'pending',
},
startedAt: Date.now(),
})
await enqueue({ topic: 'pipeline.extract', data: { pipelineId } })
return { status: 202, body: { pipelineId, traceId } }
}
// Step 2: Extract data
export const extractHandler: Handlers<typeof extractConfig> = async (
input,
{ logger, state, enqueue }
) => {
const { pipelineId } = input
logger.info('Extracting data', { pipelineId })
try {
const data = await extractData()
await state.update('pipelines', pipelineId, [
{ type: 'set', path: 'steps.extract', value: 'completed' },
{ type: 'set', path: 'extractedCount', value: data.length },
])
logger.info('Extraction completed', {
pipelineId,
recordCount: data.length,
})
await enqueue({ topic: 'pipeline.transform', data: { pipelineId, data } })
} catch (error) {
logger.error('Extraction failed', {
pipelineId,
error: error.message,
stack: error.stack,
})
await state.update('pipelines', pipelineId, [
{ type: 'set', path: 'steps.extract', value: 'failed' },
{ type: 'set', path: 'status', value: 'failed' },
{ type: 'set', path: 'error', value: error.message },
])
throw error
}
}
Performance Monitoring
Track step performance:export const handler: Handlers<typeof config> = async (input, { logger }) => {
const startTime = Date.now()
try {
const result = await expensiveOperation(input)
const duration = Date.now() - startTime
logger.info('Operation completed', {
duration,
inputSize: input.length,
outputSize: result.length,
})
// Alert if operation is slow
if (duration > 5000) {
logger.warn('Slow operation detected', {
duration,
threshold: 5000,
})
}
return { status: 200, body: { result } }
} catch (error) {
const duration = Date.now() - startTime
logger.error('Operation failed', {
duration,
error: error.message,
})
throw error
}
}
Health Checks
Implement health check endpoints:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'HealthCheck',
triggers: [{ type: 'http', method: 'GET', path: '/health' }],
flows: ['system'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (_, { state, logger }) => {
try {
// Check database connectivity
await state.get('health', 'check')
// Check external dependencies
const apiHealthy = await checkExternalAPI()
if (!apiHealthy) {
logger.warn('External API unhealthy')
return {
status: 503,
body: {
status: 'degraded',
checks: {
database: 'ok',
externalAPI: 'down',
},
},
}
}
return {
status: 200,
body: {
status: 'healthy',
timestamp: new Date().toISOString(),
checks: {
database: 'ok',
externalAPI: 'ok',
},
},
}
} catch (error) {
logger.error('Health check failed', { error })
return {
status: 503,
body: {
status: 'unhealthy',
error: error.message,
},
}
}
}
Alerts and Monitoring
Configure alerts inconfig.yaml:
modules:
otel:
alerts:
- name: high_error_rate
metric: invocations.error
threshold: 10
window: 60 # seconds
severity: critical
- name: slow_response_time
metric: performance.p95_duration_ms
threshold: 1000
window: 300
severity: warning
- name: low_success_rate
metric: invocations.success_rate
threshold: 0.95
operator: "<"
window: 300
severity: warning
Exporting to External Systems
Jaeger
modules:
otel:
enabled: true
exporter: otlp
otlp:
endpoint: http://jaeger:4317
protocol: grpc
Grafana + Tempo
modules:
otel:
enabled: true
exporter: otlp
otlp:
endpoint: http://tempo:4317
protocol: grpc
headers:
authorization: "Bearer ${GRAFANA_API_KEY}"
Datadog
modules:
otel:
enabled: true
exporter: otlp
otlp:
endpoint: http://datadog-agent:4317
protocol: grpc
resource_attributes:
deployment.environment: production
service.name: motia-app
Best Practices
1. Use Structured Logging
// Good: Structured, searchable
logger.info('Order processed', {
orderId: '123',
amount: 99.99,
userId: 'user-456',
})
// Avoid: Unstructured strings
logger.info(`Order 123 processed for $99.99 by user-456`)
2. Include Context in Logs
logger.error('Payment failed', {
orderId,
userId,
amount,
paymentMethod: 'credit_card',
errorCode: error.code,
errorMessage: error.message,
timestamp: Date.now(),
})
3. Log at Appropriate Levels
// ERROR: Errors that need attention
logger.error('Database connection failed', { error })
// WARN: Unexpected but handled situations
logger.warn('Rate limit approaching', { currentRate: 95 })
// INFO: Important business events
logger.info('Order placed', { orderId, amount })
// DEBUG: Detailed debugging (disabled in production)
logger.debug('Processing item', { itemId, details })
4. Don’t Log Sensitive Data
// Good: Masked sensitive data
logger.info('Payment processed', {
cardLast4: '4242',
amount: 99.99,
})
// Avoid: Logging full card numbers, passwords, tokens
logger.info('Payment processed', {
cardNumber: '4242424242424242', // DON'T DO THIS
})
Next Steps
- Learn about State Management for persisting data
- Explore Workflows for building complex flows
- Check out Streaming for real-time monitoring