Skip to main content

Overview

The III Engine is the high-performance Rust core that orchestrates Motia applications. It provides:
  • Function invocation - Call handlers across workers
  • Trigger management - Register and fire triggers
  • Queue processing - Asynchronous event routing
  • State management - Distributed key-value storage
  • Stream synchronization - Real-time data streams
  • WebSocket protocol - Communication between workers and engine
  • Telemetry - OpenTelemetry-based observability

Architecture

Core Components

Engine Struct

The main engine orchestrator:
pub struct Engine {
    pub worker_registry: Arc<WorkerRegistry>,
    pub functions: Arc<FunctionsRegistry>,
    pub trigger_registry: Arc<TriggerRegistry>,
    pub service_registry: Arc<ServicesRegistry>,
    pub invocations: Arc<InvocationHandler>,
    pub channel_manager: Arc<ChannelManager>,
}
Reference: engine/src/engine/mod.rs:142-150

Engine Trait

Defines the core engine operations:
pub trait EngineTrait: Send + Sync {
    async fn call(
        &self,
        function_id: &str,
        input: impl Serialize + Send,
    ) -> Result<Option<Value>, ErrorBody>;
    
    async fn register_trigger_type(&self, trigger_type: TriggerType);
    
    fn register_function(
        &self,
        request: RegisterFunctionRequest,
        handler: Box<dyn FunctionHandler + Send + Sync>,
    );
}
Reference: engine/src/engine/mod.rs:121-140

WebSocket Protocol

Workers communicate with the engine using a JSON-based WebSocket protocol.

Message Types

pub enum Message {
    // Trigger management
    RegisterTriggerType { id: String, description: String },
    RegisterTrigger { id: String, trigger_type: String, function_id: String, config: Value },
    UnregisterTrigger { id: String, trigger_type: Option<String> },
    TriggerRegistrationResult { id: String, trigger_type: String, function_id: String, error: Option<ErrorBody> },
    
    // Function management
    RegisterFunction { id: String, description: Option<String>, request_format: Option<Value>, response_format: Option<Value>, metadata: Option<Value>, invocation: Option<HttpInvocationRef> },
    UnregisterFunction { id: String },
    
    // Invocation
    InvokeFunction { invocation_id: Option<Uuid>, function_id: String, data: Value, traceparent: Option<String>, baggage: Option<String> },
    InvocationResult { invocation_id: Uuid, function_id: String, result: Option<Value>, error: Option<ErrorBody>, traceparent: Option<String>, baggage: Option<String> },
    
    // Service management
    RegisterService { id: String, name: String, description: Option<String> },
    
    // Health
    Ping,
    Pong,
    WorkerRegistered { worker_id: String },
}
Reference: engine/src/protocol.rs:33-107

Binary Frames

The engine supports binary frames with magic prefixes for telemetry:
  • OTLP (OTLP prefix) - Trace spans
  • MTRC (MTRC prefix) - Metrics
  • LOGS (LOGS prefix) - Log records
const OTLP_WS_PREFIX: &[u8] = b"OTLP";
const MTRC_WS_PREFIX: &[u8] = b"MTRC";
const LOGS_WS_PREFIX: &[u8] = b"LOGS";
Reference: engine/src/engine/mod.rs:35-39

Connection Lifecycle

1

Connection Established

Worker opens WebSocket connection to engine:
ws://localhost:8080/ws
2

Worker Registration

Engine assigns worker ID and sends confirmation:
{
  "type": "workerregistered",
  "worker_id": "550e8400-e29b-41d4-a716-446655440000"
}
3

Function Registration

Worker registers its functions:
{
  "type": "registerfunction",
  "id": "CreateTodo",
  "description": "Create a new todo item",
  "request_format": { "type": "object", ... },
  "response_format": { "type": "object", ... }
}
4

Trigger Registration

Worker registers triggers for its functions:
{
  "type": "registertrigger",
  "id": "http:POST:/todo",
  "trigger_type": "http",
  "function_id": "CreateTodo",
  "config": { "path": "/todo", "method": "POST" }
}
5

Invocation

Engine invokes function when trigger fires:
{
  "type": "invokefunction",
  "invocation_id": "123e4567-e89b-12d3-a456-426614174000",
  "function_id": "CreateTodo",
  "data": { "description": "Buy groceries" },
  "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
}
6

Result

Worker sends invocation result back:
{
  "type": "invocationresult",
  "invocation_id": "123e4567-e89b-12d3-a456-426614174000",
  "function_id": "CreateTodo",
  "result": { "id": "todo-123", "description": "Buy groceries" },
  "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
}
7

Cleanup

When worker disconnects:
  • Unregister all functions
  • Unregister all triggers
  • Halt pending invocations
  • Fire worker_disconnected trigger
Reference: engine/src/engine/mod.rs:645-799

Module System

The engine uses a modular architecture where each module provides specific functionality.

Module Trait

#[async_trait::async_trait]
pub trait Module: Send + Sync {
    fn id(&self) -> &str;
    fn name(&self) -> &str;
    async fn initialize(&self) -> anyhow::Result<()>;
    async fn shutdown(&self) -> anyhow::Result<()>;
}

Available Modules

Queue Module

Manages event-driven messaging with adapters for Redis, RabbitMQ, etc.Location: engine/src/modules/queue/

State Module

Provides distributed key-value storage with state change triggers.Location: engine/src/modules/state/

Stream Module

Enables real-time data synchronization via WebSocket connections.Location: engine/src/modules/stream/

HTTP Module

Exposes REST API endpoints and handles HTTP function invocations.Location: engine/src/modules/rest_api/

Cron Module

Schedules and executes time-based triggers.Location: engine/src/modules/cron/

Observability Module

Collects and exports traces, metrics, and logs.Location: engine/src/modules/observability/
Reference: engine/src/lib.rs:20-38

Queue Module

Handles asynchronous event processing.

Queue Adapter Trait

#[async_trait::async_trait]
pub trait QueueAdapter: Send + Sync + 'static {
    async fn enqueue(
        &self,
        topic: &str,
        data: Value,
        traceparent: Option<String>,
        baggage: Option<String>,
    );
    
    async fn subscribe(
        &self,
        topic: &str,
        id: &str,
        function_id: &str,
        condition_function_id: Option<String>,
        queue_config: Option<SubscriberQueueConfig>,
    );
    
    async fn unsubscribe(&self, topic: &str, id: &str);
    async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64>;
    async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64>;
}
Reference: engine/src/modules/queue/mod.rs:19-39

Redis Adapter

Default queue implementation using Redis:
  • Standard queues: Redis lists for FIFO processing
  • FIFO queues: Redis sorted sets with message group partitioning
  • Dead-letter queues: Failed messages stored for retry

State Module

Provides distributed state management.

State Operations

  • set: Store or update a value
  • get: Retrieve a value
  • update: Apply JSON Patch operations
  • delete: Remove a value
  • list: List all values in a group
  • clear: Clear all values in a group

State Triggers

State changes automatically fire triggers:
// When state changes...
state.set("users", "user-123", user_data).await;

// Triggers are fired...
engine.fire_triggers("state", json!({
    "type": "state",
    "group_id": "users",
    "item_id": "user-123",
    "old_value": old_value,
    "new_value": user_data,
})).await;
Reference: engine/src/modules/state/mod.rs

Stream Module

Enables real-time data synchronization.

Stream Operations

  • set: Create or update stream item
  • get: Retrieve stream item
  • update: Apply JSON Patch operations
  • delete: Remove stream item
  • list: List items in a group

Stream Events

Stream operations emit events to subscribed clients:
pub enum StreamEvent {
    Create { data: Value },
    Update { data: Value },
    Delete { data: Value },
}

WebSocket Synchronization

Clients subscribe to streams via WebSocket and receive real-time updates:
{
  "type": "stream",
  "timestamp": 1634567890000,
  "streamName": "todo",
  "groupId": "inbox",
  "id": "todo-123",
  "event": {
    "type": "create",
    "data": { "description": "Buy groceries" }
  }
}
Reference: engine/src/modules/stream/mod.rs

Invocation Handler

Manages function invocations with distributed tracing.

Invocation Flow

1

Create Invocation

Generate unique invocation ID:
let invocation_id = Uuid::new_v4();
2

Extract Trace Context

Parse W3C Trace Context headers:
let traceparent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01";
let baggage = "key1=value1,key2=value2";
3

Send InvokeFunction

Send message to worker:
Message::InvokeFunction {
    invocation_id: Some(invocation_id),
    function_id: function_id.to_string(),
    data: input,
    traceparent,
    baggage,
}
4

Wait for Result

Block until worker responds:
let (tx, rx) = oneshot::channel();
invocations.insert(invocation_id, tx);
let result = rx.await?;
5

Return Result

Return result or error to caller:
match result {
    Ok(value) => Ok(value),
    Err(error) => Err(error),
}
Reference: engine/src/engine/mod.rs:177-220

Telemetry System

The engine includes comprehensive observability via OpenTelemetry.

Trace Propagation

Traces follow W3C Trace Context specification:
// Extract from incoming request
let traceparent = extract_traceparent_from_headers(headers);
let baggage = extract_baggage_from_headers(headers);

// Inject into outgoing request
let ctx = tracing::Span::current().context();
let traceparent = inject_traceparent_from_context(&ctx);
let baggage = inject_baggage_from_context(&ctx);
Reference: engine/src/telemetry.rs

Span Creation

let span = tracing::info_span!(
    "handle_invocation",
    otel.name = %format!("handle_invocation {}", function_id),
    worker_id = %worker.id,
    function_id = %function_id,
    invocation_id = ?invocation_id,
    otel.kind = "server",
    otel.status_code = tracing::field::Empty,
)
.with_parent_headers(traceparent.as_deref(), baggage.as_deref());
Reference: engine/src/engine/mod.rs:315-324

Metrics Collection

The engine tracks:
  • Function registrations
  • Trigger registrations
  • Invocation counts
  • Error rates
  • Latencies

Worker Registry

Tracks connected workers and their capabilities.

Worker Struct

pub struct Worker {
    pub id: Uuid,
    pub channel: mpsc::Sender<Outbound>,
    pub invocations: Arc<RwLock<HashSet<Uuid>>>,
    pub function_ids: Arc<RwLock<HashSet<String>>>,
    pub external_function_ids: Arc<RwLock<HashSet<String>>>,
}

Worker Lifecycle

  1. Connection: Worker connects, receives ID
  2. Registration: Worker registers functions and triggers
  3. Ready: Worker processes invocations
  4. Disconnection: Worker disconnects, cleanup occurs

Function Registry

Stores registered functions and their handlers.
pub struct Function {
    pub handler: Arc<dyn Fn(Option<Uuid>, Value) -> BoxFuture<'static, FunctionResult<Option<Value>, ErrorBody>> + Send + Sync>,
    pub _function_id: String,
    pub _description: Option<String>,
    pub request_format: Option<Value>,
    pub response_format: Option<Value>,
    pub metadata: Option<Value>,
}

Trigger Registry

Manages trigger registrations and firing.
pub struct Trigger {
    pub id: String,
    pub trigger_type: String,
    pub function_id: String,
    pub config: Value,
    pub worker_id: Option<Uuid>,
}

Fire Triggers

When a trigger fires, the engine invokes all matching functions:
pub async fn fire_triggers(&self, trigger_type: &str, data: Value) {
    let triggers: Vec<Trigger> = self
        .trigger_registry
        .triggers
        .iter()
        .filter(|entry| entry.value().trigger_type == trigger_type)
        .map(|entry| entry.value().clone())
        .collect();
    
    for trigger in triggers {
        let function_id = trigger.function_id.clone();
        tokio::spawn(async move {
            self.call(&function_id, data.clone()).await
        });
    }
}
Reference: engine/src/engine/mod.rs:616-643

Performance Characteristics

  • Connection pooling reduces overhead
  • Binary frames for telemetry reduce serialization cost
  • Async I/O prevents blocking
  • Local invocations: ~1-5ms
  • Cross-worker invocations: ~5-20ms (depends on network)
  • Queue processing: ~10-50ms (depends on adapter)
  • 10,000+ invocations/second per engine
  • Horizontally scalable with multiple engines
  • Queue adapters support millions of messages/day
  • Engine: ~50-100MB RAM baseline
  • Per worker: ~20-50MB RAM
  • CPU: Scales with workload

Best Practices

Run the engine as a separate process from workers for isolation and scalability.
Reuse WebSocket connections across invocations to reduce overhead.
Use WebSocket compression for large payloads.
Track invocation latency, error rates, and queue depths.
Choose appropriate queue/state/stream adapters for your workload (Redis, RabbitMQ, PostgreSQL).

Next Steps

Self-Hosting

Deploy your own III Engine

Engine Modules

Configure queue, state, and stream adapters

Observability

Monitor engine performance

Configuration

Configure resource limits

Build docs developers (and LLMs) love