Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt

Use this file to discover all available pages before exploring further.

Engine Modules

The iii Engine provides a modular architecture where each module implements specific functionality. Modules are loaded from config.yaml and can be configured independently.

Module Architecture

All modules implement the Module trait:
#[async_trait]
pub trait Module: Send + Sync {
    fn name(&self) -> &'static str;
    async fn create(engine: Arc<Engine>, config: Option<Value>) -> anyhow::Result<Box<dyn Module>>;
    fn register_functions(&self, engine: Arc<Engine>);
    async fn initialize(&self) -> anyhow::Result<()>;
    async fn start_background_tasks(&self, shutdown: tokio::sync::watch::Receiver<bool>) -> anyhow::Result<()>;
    async fn destroy(&self) -> anyhow::Result<()>;
}

Core Modules

REST API Module

Class: modules::api::RestApiModule Purpose: Maps HTTP routes to functions via http triggers Location: src/modules/rest_api/ The REST API module provides HTTP endpoint routing:
  • Dynamic route registration based on triggers
  • Hot-reloadable routes (no restart required)
  • CORS support with configurable origins
  • Request timeout and concurrency limits
  • Path parameters and query strings
  • Request/response format validation
Configuration:
modules:
  - class: modules::api::RestApiModule
    config:
      port: 3111
      host: 127.0.0.1
      default_timeout: 30000  # milliseconds
      concurrency_request_limit: 1024
      cors:
        allowed_origins:
          - '*'  # or specific origins
        allowed_methods:
          - GET
          - POST
          - PUT
          - DELETE
          - OPTIONS
Key Features:
  • Hot Router: Routes are updated dynamically when triggers are registered
  • Path Router Registry: Maintains mapping of {METHOD}:{path} → function
  • Middleware Stack: Timeout, concurrency limiting, CORS
  • Dynamic Handler: Extracts request data and invokes functions
Implementation: src/modules/rest_api/api_core.rs Trigger registration creates HTTP routes:
{
  "type": "registertrigger",
  "trigger_type": "http",
  "function_id": "users.create",
  "config": {
    "api_path": "users",
    "http_method": "POST"
  }
}
This maps POST /usersusers.create function.

Queue Module

Class: modules::queue::QueueModule Purpose: Message queue with pub/sub, DLQ, and retry logic Location: src/modules/queue/ The Queue module provides reliable message processing:
  • Topic-based pub/sub
  • Dead Letter Queue (DLQ) for failed messages
  • Automatic retry with exponential backoff
  • Conditional subscriptions
  • Distributed tracing support
Configuration:
modules:
  - class: modules::queue::QueueModule
    config:
      adapter:
        class: modules::queue::RedisAdapter
        config:
          redis_url: redis://localhost:6379
Available Adapters:
  • modules::queue::RedisAdapter - Redis-backed queue (production)
  • modules::queue::RabbitMQAdapter - RabbitMQ with advanced topology
  • modules::queue::BuiltinAdapter - In-memory queue (development)
Key Features:
  • Enqueue Function: queue.enqueue - Publish messages to topics
  • Subscribe Trigger: queue trigger type - Subscribe functions to topics
  • DLQ Management: queue.redrive_dlq and queue.dlq_count functions
  • Trace Propagation: Automatic W3C traceparent/baggage forwarding
Implementation: src/modules/queue/queue.rs Example usage:
// Publish to queue
await iii.call('queue.enqueue', {
  topic: 'orders.created',
  data: { order_id: 123 }
});

// Subscribe to queue
iii.registerFunction({ id: 'orders.process' }, async (data) => {
  console.log('Processing order:', data.order_id);
});

iii.registerTrigger({
  type: 'queue',
  function_id: 'orders.process',
  config: { topic: 'orders.created' }
});
Queue Configuration Options:
interface SubscriberQueueConfig {
  max_retries?: number;        // Default: 3
  retry_delay_ms?: number;     // Default: 1000
  retry_backoff_factor?: number; // Default: 2.0
}

Cron Module

Class: modules::cron::CronModule Purpose: Distributed cron scheduling with lock coordination Location: src/modules/cron/ The Cron module provides scheduled function execution:
  • Standard cron expressions
  • Distributed locking (prevents duplicate execution)
  • Timezone support
  • Conditional execution
Configuration:
modules:
  - class: modules::cron::CronModule
    config:
      adapter:
        class: modules::cron::KvCronAdapter
        # or modules::cron::RedisAdapter
Available Adapters:
  • modules::cron::KvCronAdapter - File-based coordination
  • modules::cron::RedisAdapter - Redis-based coordination (multi-instance)
Key Features:
  • Cron Expressions: Standard 5-field cron syntax
  • Distributed Locks: Only one instance executes at a time
  • Graceful Shutdown: Cancels pending jobs on shutdown
Implementation: src/modules/cron/cron.rs Example usage:
iii.registerFunction({ id: 'reports.daily' }, async () => {
  console.log('Generating daily report');
});

iii.registerTrigger({
  type: 'cron',
  function_id: 'reports.daily',
  config: {
    expression: '0 9 * * *'  // 9am daily
  }
});
Cron Expression Format:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *

State Module

Class: modules::state::StateModule Purpose: Distributed key-value store with change triggers Location: src/modules/state/ The State module provides reactive state management:
  • Key-value storage
  • Watch keys for changes
  • Trigger functions on state changes
  • Atomic operations
Configuration:
modules:
  - class: modules::state::StateModule
    config:
      adapter:
        class: modules::state::adapters::KvStore
        config:
          store_method: file_based
          file_path: ./data/state_store.db
Available Adapters:
  • modules::state::adapters::KvStore - File-based or in-memory
  • modules::state::adapters::RedisAdapter - Redis-backed state
Key Features:
  • Get/Set/Delete: state.get, state.set, state.delete functions
  • Watch Trigger: state trigger type - React to state changes
  • Pattern Matching: Watch key patterns with wildcards
Implementation: src/modules/state/state.rs Example usage:
// Set state
await iii.call('state.set', {
  key: 'user:123:status',
  value: 'online'
});

// Watch state changes
iii.registerFunction({ id: 'users.on_status_change' }, async (data) => {
  console.log('User status changed:', data);
});

iii.registerTrigger({
  type: 'state',
  function_id: 'users.on_status_change',
  config: {
    key_pattern: 'user:*:status'
  }
});

Stream Module

Class: modules::stream::StreamModule Purpose: Real-time WebSocket pub/sub for client connections Location: src/modules/stream/ The Stream module enables real-time communication:
  • WebSocket-based pub/sub
  • Channel-based messaging
  • Client subscriptions
  • Presence tracking
Configuration:
modules:
  - class: modules::stream::StreamModule
    config:
      port: 3112
      host: 127.0.0.1
      adapter:
        class: modules::stream::adapters::RedisAdapter
        config:
          redis_url: redis://localhost:6379
Available Adapters:
  • modules::stream::adapters::RedisAdapter - Redis pub/sub (multi-instance)
  • modules::stream::adapters::KvStore - In-memory (single instance)
Key Features:
  • Channels: Named channels for topic-based messaging
  • Subscriptions: Clients subscribe to channels
  • Broadcast: stream.broadcast function publishes to channel
  • Presence: Track connected clients
Implementation: src/modules/stream/stream.rs Example usage:
// Broadcast to channel
await iii.call('stream.broadcast', {
  channel: 'chat:lobby',
  data: { user: 'Alice', message: 'Hello!' }
});
Clients connect via WebSocket on port 3112 and subscribe to channels.

Observability Module

Class: modules::observability::OtelModule Purpose: OpenTelemetry traces, metrics, and logs Location: src/modules/observability/ The Observability module provides production monitoring:
  • Distributed tracing (OTLP)
  • Metrics collection
  • Structured logging
  • Alert rules
Configuration:
modules:
  - class: modules::observability::OtelModule
    config:
      enabled: true
      service_name: iii
      service_version: 0.2.0
      exporter: otlp  # or 'memory' or 'both'
      endpoint: http://localhost:4317
      sampling_ratio: 1.0
      metrics_enabled: true
      logs_enabled: true
Key Features:
  • Trace Exporter: Export to OTLP collector (Jaeger, Tempo, etc.)
  • Metrics: Built-in metrics for invocations, errors, latency
  • Sampling: Configurable sampling with per-operation rules
  • Memory Storage: Query traces/metrics via functions
  • Alert Rules: Threshold-based alerts
Advanced Sampling:
sampling:
  default: 0.1
  parent_based: true
  rules:
    - operation: "health.*"
      rate: 0.01  # Sample 1% of health checks
    - operation: "api.critical.*"
      rate: 1.0   # Sample 100% of critical APIs
  rate_limit:
    max_traces_per_second: 100
Implementation: src/modules/observability/

Shell Module

Class: modules::shell::ExecModule Purpose: File watcher that runs commands on change Location: src/modules/shell/ The Shell module enables file-based automation:
  • Watch files/directories for changes
  • Execute shell commands on events
  • Glob pattern matching
Configuration:
modules:
  - class: modules::shell::ExecModule
    config:
      enabled: true
Trigger Example:
iii.registerTrigger({
  type: 'shell',
  function_id: 'build.run',
  config: {
    watch: 'src/**/*.ts',
    command: 'npm run build'
  }
});

Module Lifecycle

  1. Create: Module is instantiated with configuration
  2. Register Functions: Module registers its built-in functions
  3. Initialize: Module starts services (HTTP server, background tasks)
  4. Start Background Tasks: Long-running tasks with shutdown handling
  5. Destroy: Graceful cleanup on shutdown

Adding Custom Modules

Modules are registered via the inventory pattern:
inventory::submit! {
    ModuleRegistration {
        class: "modules::custom::MyModule",
        factory: |engine, config| {
            Box::pin(MyModule::create(engine, config))
        },
        is_default: false,
    }
}
See examples/custom_queue_adapter.rs for a complete example.

Next Steps

Build docs developers (and LLMs) love