Skip to main content

Observability

Motia provides built-in observability through OpenTelemetry integration, giving you full visibility into your application with distributed tracing, structured logging, and metrics.

Distributed Tracing

Every step execution generates a complete trace automatically:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'ProcessOrder',
  triggers: [{ type: 'http', method: 'POST', path: '/orders' }],
  enqueues: ['order.fulfill'],
  flows: ['orders'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { logger, enqueue, state }) => {
  const orderId = crypto.randomUUID()

  logger.info('Processing order', { orderId })

  // State operations are automatically traced
  await state.set('orders', orderId, {
    id: orderId,
    ...request.body,
    status: 'processing',
  })

  // Enqueue operations are traced
  await enqueue({
    topic: 'order.fulfill',
    data: { orderId },
  })

  logger.info('Order created', { orderId })

  return { status: 201, body: { orderId } }
}
The trace will show:
  • HTTP request span
  • Step execution span
  • State operations (set, get, update, delete)
  • Enqueue operations
  • Log entries correlated by trace ID

Structured Logging

Use the built-in logger for structured, searchable logs:
export const handler: Handlers<typeof config> = async ({ request }, { logger, traceId }) => {
  const { userId, action } = request.body

  // Info level logging
  logger.info('User action received', {
    userId,
    action,
    timestamp: Date.now(),
  })

  try {
    const result = await performAction(action)

    logger.info('Action completed successfully', {
      userId,
      action,
      result,
    })

    return { status: 200, body: { result } }
  } catch (error) {
    // Error logging with context
    logger.error('Action failed', {
      userId,
      action,
      error: error.message,
      stack: error.stack,
    })

    return { status: 500, body: { error: 'Action failed' } }
  }
}

Log Levels

export const handler: Handlers<typeof config> = async (_, { logger }) => {
  // Trace: Very detailed debugging
  logger.trace('Entering function', { step: 'validation' })

  // Debug: Detailed debugging information
  logger.debug('Processing data', { itemCount: 42 })

  // Info: General informational messages
  logger.info('Operation started', { operationId: '123' })

  // Warn: Warning messages for potentially harmful situations
  logger.warn('Rate limit approaching', { currentRate: 95, limit: 100 })

  // Error: Error events that might still allow the application to continue
  logger.error('Failed to fetch data', { error: 'timeout' })
}

Trace Correlation

All operations within a trace are automatically correlated:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'StartWorkflow',
  triggers: [{ type: 'http', method: 'POST', path: '/workflow' }],
  enqueues: ['step.one', 'step.two'],
  flows: ['workflow'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { logger, enqueue, traceId }) => {
  logger.info('Workflow started', { traceId })

  // These enqueued steps will share the same trace ID
  await Promise.all([
    enqueue({ topic: 'step.one', data: { traceId } }),
    enqueue({ topic: 'step.two', data: { traceId } }),
  ])

  return { status: 202, body: { traceId } }
}

// Downstream step - automatically part of the same trace
export const stepOneHandler: Handlers<typeof stepOneConfig> = async (input, { logger, traceId }) => {
  logger.info('Executing step one', { traceId })
  // This log will be correlated with the parent trace
}

Metrics

Motia automatically collects performance metrics:

Built-in Metrics

// Access metrics via the iii Console or API
{
  "engine_metrics": {
    "invocations": {
      "total": 1547,
      "success": 1523,
      "error": 24,
      "deferred": 0,
      "by_function": {
        "ProcessOrder": 856,
        "SendNotification": 691
      }
    },
    "workers": {
      "spawns": 12,
      "deaths": 0,
      "active": 12
    },
    "performance": {
      "avg_duration_ms": 125.4,
      "p50_duration_ms": 98.2,
      "p95_duration_ms": 287.5,
      "p99_duration_ms": 456.1,
      "min_duration_ms": 12.3,
      "max_duration_ms": 1234.5
    }
  }
}

Custom Metrics

Track custom application metrics:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'TrackBusinessMetrics',
  triggers: [{ type: 'queue', topic: 'order.completed' }],
  flows: ['metrics'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { state, logger }) => {
  const { orderId, amount, category } = input

  // Track revenue by category
  await state.update('metrics', `revenue-${category}`, [
    { type: 'increment', path: 'totalAmount', by: amount },
    { type: 'increment', path: 'orderCount', by: 1 },
  ])

  // Track daily totals
  const today = new Date().toISOString().split('T')[0]
  await state.update('metrics', `daily-${today}`, [
    { type: 'increment', path: 'revenue', by: amount },
    { type: 'increment', path: 'orders', by: 1 },
  ])

  logger.info('Business metrics updated', {
    category,
    amount,
    orderId,
  })
}

OpenTelemetry Configuration

Configure OTEL settings in config.yaml:
modules:
  otel:
    enabled: true
    service_name: motia-app
    service_namespace: production
    service_version: 1.0.0
    
    # Exporter configuration
    exporter: both  # Options: otlp, memory, both
    
    otlp:
      endpoint: http://localhost:4317
      protocol: grpc  # or http
      
    # Sampling configuration
    sampling:
      default: 1.0  # 100% sampling
      parent_based: true
      rules:
        - operation: "GET /health"
          rate: 0.1  # Sample 10% of health checks
        - operation: "ProcessOrder"
          rate: 1.0  # Always sample orders
    
    # Logs sampling
    logs_sampling_ratio: 1.0
    
    # Resource attributes
    resource_attributes:
      deployment.environment: production
      service.instance.id: server-01

iii Console

The built-in iii Console provides real-time observability:

Traces View

# Start your application
iii

# Console available at: http://localhost:3000
The Traces view shows:
  • Complete trace timelines
  • Span hierarchy and relationships
  • State operations per trace
  • Enqueue operations
  • Error details and stack traces

Logs View

Filter and search structured logs:
// All logs are automatically indexed and searchable
logger.info('User login', {
  userId: 'user-123',
  method: 'oauth',
  provider: 'google',
})

// Search in Console:
// - By trace ID
// - By severity (ERROR, WARN, INFO, DEBUG)
// - By time range
// - By custom attributes (userId, method, etc.)

State Inspector

View current state in real-time:
// State operations are visible in the console
await state.set('users', userId, userData)
await state.update('orders', orderId, [
  { type: 'increment', path: 'count', by: 1 },
])

// Inspect in Console:
// - All scopes and keys
// - Current values
// - Update history
// - Related traces

Monitoring Workflows

Track multi-step workflows:
import type { Handlers, StepConfig } from 'motia'

// Step 1: Start workflow
export const startConfig = {
  name: 'StartDataPipeline',
  triggers: [{ type: 'http', method: 'POST', path: '/pipeline' }],
  enqueues: ['pipeline.extract', 'pipeline.transform'],
  flows: ['data-pipeline'],
} as const satisfies StepConfig

export const startHandler: Handlers<typeof startConfig> = async (
  { request },
  { logger, enqueue, state, traceId }
) => {
  const pipelineId = crypto.randomUUID()

  logger.info('Starting data pipeline', {
    pipelineId,
    traceId,
    source: request.body.source,
  })

  // Initialize pipeline tracking
  await state.set('pipelines', pipelineId, {
    id: pipelineId,
    traceId,
    status: 'running',
    steps: {
      extract: 'pending',
      transform: 'pending',
      load: 'pending',
    },
    startedAt: Date.now(),
  })

  await enqueue({ topic: 'pipeline.extract', data: { pipelineId } })

  return { status: 202, body: { pipelineId, traceId } }
}

// Step 2: Extract data
export const extractHandler: Handlers<typeof extractConfig> = async (
  input,
  { logger, state, enqueue }
) => {
  const { pipelineId } = input

  logger.info('Extracting data', { pipelineId })

  try {
    const data = await extractData()

    await state.update('pipelines', pipelineId, [
      { type: 'set', path: 'steps.extract', value: 'completed' },
      { type: 'set', path: 'extractedCount', value: data.length },
    ])

    logger.info('Extraction completed', {
      pipelineId,
      recordCount: data.length,
    })

    await enqueue({ topic: 'pipeline.transform', data: { pipelineId, data } })
  } catch (error) {
    logger.error('Extraction failed', {
      pipelineId,
      error: error.message,
      stack: error.stack,
    })

    await state.update('pipelines', pipelineId, [
      { type: 'set', path: 'steps.extract', value: 'failed' },
      { type: 'set', path: 'status', value: 'failed' },
      { type: 'set', path: 'error', value: error.message },
    ])

    throw error
  }
}

Performance Monitoring

Track step performance:
export const handler: Handlers<typeof config> = async (input, { logger }) => {
  const startTime = Date.now()

  try {
    const result = await expensiveOperation(input)

    const duration = Date.now() - startTime

    logger.info('Operation completed', {
      duration,
      inputSize: input.length,
      outputSize: result.length,
    })

    // Alert if operation is slow
    if (duration > 5000) {
      logger.warn('Slow operation detected', {
        duration,
        threshold: 5000,
      })
    }

    return { status: 200, body: { result } }
  } catch (error) {
    const duration = Date.now() - startTime

    logger.error('Operation failed', {
      duration,
      error: error.message,
    })

    throw error
  }
}

Health Checks

Implement health check endpoints:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'HealthCheck',
  triggers: [{ type: 'http', method: 'GET', path: '/health' }],
  flows: ['system'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (_, { state, logger }) => {
  try {
    // Check database connectivity
    await state.get('health', 'check')

    // Check external dependencies
    const apiHealthy = await checkExternalAPI()

    if (!apiHealthy) {
      logger.warn('External API unhealthy')
      return {
        status: 503,
        body: {
          status: 'degraded',
          checks: {
            database: 'ok',
            externalAPI: 'down',
          },
        },
      }
    }

    return {
      status: 200,
      body: {
        status: 'healthy',
        timestamp: new Date().toISOString(),
        checks: {
          database: 'ok',
          externalAPI: 'ok',
        },
      },
    }
  } catch (error) {
    logger.error('Health check failed', { error })

    return {
      status: 503,
      body: {
        status: 'unhealthy',
        error: error.message,
      },
    }
  }
}

Alerts and Monitoring

Configure alerts in config.yaml:
modules:
  otel:
    alerts:
      - name: high_error_rate
        metric: invocations.error
        threshold: 10
        window: 60  # seconds
        severity: critical
        
      - name: slow_response_time
        metric: performance.p95_duration_ms
        threshold: 1000
        window: 300
        severity: warning
        
      - name: low_success_rate
        metric: invocations.success_rate
        threshold: 0.95
        operator: "<"
        window: 300
        severity: warning

Exporting to External Systems

Jaeger

modules:
  otel:
    enabled: true
    exporter: otlp
    otlp:
      endpoint: http://jaeger:4317
      protocol: grpc

Grafana + Tempo

modules:
  otel:
    enabled: true
    exporter: otlp
    otlp:
      endpoint: http://tempo:4317
      protocol: grpc
      headers:
        authorization: "Bearer ${GRAFANA_API_KEY}"

Datadog

modules:
  otel:
    enabled: true
    exporter: otlp
    otlp:
      endpoint: http://datadog-agent:4317
      protocol: grpc
    resource_attributes:
      deployment.environment: production
      service.name: motia-app

Best Practices

1. Use Structured Logging

// Good: Structured, searchable
logger.info('Order processed', {
  orderId: '123',
  amount: 99.99,
  userId: 'user-456',
})

// Avoid: Unstructured strings
logger.info(`Order 123 processed for $99.99 by user-456`)

2. Include Context in Logs

logger.error('Payment failed', {
  orderId,
  userId,
  amount,
  paymentMethod: 'credit_card',
  errorCode: error.code,
  errorMessage: error.message,
  timestamp: Date.now(),
})

3. Log at Appropriate Levels

// ERROR: Errors that need attention
logger.error('Database connection failed', { error })

// WARN: Unexpected but handled situations
logger.warn('Rate limit approaching', { currentRate: 95 })

// INFO: Important business events
logger.info('Order placed', { orderId, amount })

// DEBUG: Detailed debugging (disabled in production)
logger.debug('Processing item', { itemId, details })

4. Don’t Log Sensitive Data

// Good: Masked sensitive data
logger.info('Payment processed', {
  cardLast4: '4242',
  amount: 99.99,
})

// Avoid: Logging full card numbers, passwords, tokens
logger.info('Payment processed', {
  cardNumber: '4242424242424242', // DON'T DO THIS
})

Next Steps

Build docs developers (and LLMs) love