Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Installation
Add to your Cargo.toml:
[dependencies]
iii-sdk = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Quick Start
use iii_sdk::{III, Value};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let iii = III::new("ws://127.0.0.1:49134");
iii.connect().await?;
iii.register_function("my.function", |input| async move {
Ok(json!({ "message": "Hello, world!", "input": input }))
});
let result: Value = iii
.call("my.function", json!({ "param": "value" }))
.await?;
println!("result: {result}");
Ok(())
}
Initialization
III::new()
Create a new III SDK instance with default worker metadata.
WebSocket address of the III Engine (e.g., ws://127.0.0.1:49134)
use iii_sdk::III;
let iii = III::new("ws://127.0.0.1:49134");
Create a new III SDK instance with custom worker metadata.
WebSocket address of the III Engine
use iii_sdk::{III, WorkerMetadata};
let metadata = WorkerMetadata {
runtime: "rust".to_string(),
version: "0.1.0".to_string(),
name: "my-worker".to_string(),
os: "linux".to_string(),
telemetry: None,
};
let iii = III::with_metadata("ws://127.0.0.1:49134", metadata);
Set custom worker metadata (call before connect()).
use iii_sdk::WorkerMetadata;
let metadata = WorkerMetadata::default();
iii.set_metadata(metadata);
set_otel_config()
Set OpenTelemetry configuration (requires otel feature, call before connect()).
OpenTelemetry configuration
#[cfg(feature = "otel")]
use iii_sdk::OtelConfig;
#[cfg(feature = "otel")]
iii.set_otel_config(OtelConfig {
enabled: true,
service_name: Some("my-service".to_string()),
engine_ws_url: None, // Auto-detected from III address
..Default::default()
});
connect()
Connect to the III Engine. This starts a background task that handles automatic reconnection.
Result indicating success or failure
Function Registration
register_function()
Register a function that can be invoked by other services or triggers.
id
impl Into<String>
required
Unique function identifier (use :: for namespacing, e.g., external::my_lambda)
handler
impl IntoFunctionHandler
required
Function handler (closure or HttpInvocationConfig)
Function reference with id and unregister() method
use serde_json::{json, Value};
use iii_sdk::IIIError;
// With closure
let ref1 = iii.register_function("my.function", |input: Value| async move {
Ok(json!({ "result": "success" }))
});
// With HTTP invocation config
use iii_sdk::HttpInvocationConfig;
let ref2 = iii.register_function(
"external::lambda",
HttpInvocationConfig {
url: "https://my-lambda.example.com/invoke".to_string(),
method: Some("POST".to_string()),
timeout_ms: Some(10000),
headers: None,
auth: None,
}
);
// Unregister later
ref1.unregister();
register_function_with_description()
Register a function with a description.
id
impl Into<String>
required
Function identifier
description
impl Into<String>
required
Human-readable description
handler
impl IntoFunctionHandler
required
Function handler
iii.register_function_with_description(
"my.function",
"Processes user data",
|input: Value| async move {
Ok(json!({ "result": "success" }))
}
);
register_function_with()
Register a function with full metadata.
message
RegisterFunctionMessage
required
Complete function registration message
handler
impl IntoFunctionHandler
required
Function handler
use iii_sdk::RegisterFunctionMessage;
let message = RegisterFunctionMessage {
id: "my.function".to_string(),
description: Some("My function".to_string()),
request_format: None,
response_format: None,
metadata: None,
invocation: None,
};
iii.register_function_with(message, |input: Value| async move {
Ok(json!({ "result": "success" }))
});
Function Invocation
call()
Invoke a function synchronously and wait for the result.
ID of the function to invoke
Input data to pass to the function
let result = iii.call("my.function", json!({ "param": "value" })).await?;
println!("Result: {result}");
call_with_timeout()
Invoke a function with a custom timeout.
ID of the function to invoke
Input data (must be serde_json::Value)
use std::time::Duration;
let result = iii.call_with_timeout(
"slow.function",
json!({ "data": "x" }),
Duration::from_secs(60)
).await?;
call_void()
Invoke a function asynchronously without waiting for a result (fire-and-forget).
ID of the function to invoke
Input data to pass to the function
Success or error (does not wait for function result)
iii.call_void("my.function", json!({ "param": "value" }))?;
// Function is invoked in the background
trigger()
Alias for call(). Invokes a function synchronously.
let result = iii.trigger("my.function", json!({ "param": "value" })).await?;
trigger_with_timeout()
Alias for call_with_timeout().
let result = iii.trigger_with_timeout(
"my.function",
json!({ "param": "value" }),
Duration::from_secs(60)
).await?;
trigger_void()
Alias for call_void().
iii.trigger_void("my.function", json!({ "param": "value" }))?;
Trigger Management
register_trigger()
Register a trigger to automatically invoke a function based on events.
trigger_type
impl Into<String>
required
Trigger type (e.g., http, schedule, custom types)
function_id
impl Into<String>
required
ID of the function to invoke when triggered
Trigger-specific configuration (depends on trigger type)
return
Result<Trigger, IIIError>
Trigger object with unregister() method
let trigger = iii.register_trigger(
"http",
"my.function",
json!({
"api_path": "/hello",
"http_method": "POST",
})
)?;
// Unregister later
trigger.unregister();
register_trigger_type()
Register a custom trigger type with custom logic.
id
impl Into<String>
required
Unique trigger type identifier
description
impl Into<String>
required
Human-readable description
handler
impl TriggerHandler + 'static
required
Trigger type handler
use iii_sdk::{TriggerHandler, TriggerConfig};
use async_trait::async_trait;
struct MyTriggerHandler;
#[async_trait]
impl TriggerHandler for MyTriggerHandler {
async fn register_trigger(&self, config: TriggerConfig) -> Result<(), IIIError> {
println!("Trigger registered: {}", config.id);
Ok(())
}
async fn unregister_trigger(&self, config: TriggerConfig) -> Result<(), IIIError> {
println!("Trigger unregistered: {}", config.id);
Ok(())
}
}
iii.register_trigger_type(
"myTriggerType",
"My custom trigger type",
MyTriggerHandler
);
unregister_trigger_type()
Unregister a custom trigger type.
id
impl Into<String>
required
Trigger type ID to unregister
iii.unregister_trigger_type("myTriggerType");
Service Registration
register_service()
Register a service for grouping related functions.
id
impl Into<String>
required
Service identifier
iii.register_service("my-service", Some("My service description".to_string()));
register_service_with_name()
Register a service with a custom display name.
id
impl Into<String>
required
Service identifier
name
impl Into<String>
required
Display name
iii.register_service_with_name(
"my-service",
"My Service",
Some("Service description".to_string())
);
Engine Functions
list_functions()
List all registered functions in the engine.
return
Result<Vec<FunctionInfo>, IIIError>
Vector of function information objects
let functions = iii.list_functions().await?;
for func in functions {
println!("{}: {:?}", func.function_id, func.description);
}
Event Listeners
on_functions_available()
Subscribe to function availability events.
callback
impl Fn(Vec<FunctionInfo>) + Send + Sync + 'static
required
Callback invoked when functions are registered/unregistered
Guard that unsubscribes when dropped
let _guard = iii.on_functions_available(|functions| {
println!("Available functions: {:?}",
functions.iter().map(|f| &f.function_id).collect::<Vec<_>>());
});
// Guard unsubscribes when dropped
Context & Logging
get_context()
Get the current execution context (available within function handlers).
Context object with logger and optionally span (OpenTelemetry span)
use iii_sdk::{get_context, Value};
iii.register_function("my.function", |input: Value| async move {
let ctx = get_context();
ctx.logger.info("Processing request", Some(&[("input", &input)]));
ctx.logger.warn("Something might be wrong", None);
ctx.logger.error("An error occurred", Some(&[("error", "details")]));
Ok(json!({ "result": "success" }))
});
with_context()
Run a function with a custom context.
Context to use during execution
f
impl FnOnce() -> Fut
required
Function to execute
use iii_sdk::{with_context, Context, Logger};
let logger = Logger::new(Some("my-function".to_string()));
let ctx = Context { logger, span: None };
let result = with_context(ctx, || async {
// Your code here
Ok(json!({ "result": "success" }))
}).await?;
Lifecycle
shutdown_async()
Gracefully shut down the SDK, flushing all telemetry data.
iii.shutdown_async().await;
shutdown() (deprecated)
Shut down the SDK without waiting for telemetry flush. Use shutdown_async() instead.
#[allow(deprecated)]
iii.shutdown();
Properties
address()
Get the engine WebSocket address.
let addr = iii.address();
println!("Connected to: {addr}");
Types
FunctionInfo
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunctionInfo {
pub function_id: String,
pub description: Option<String>,
pub request_format: Option<Value>,
pub response_format: Option<Value>,
pub metadata: Option<Value>,
}
WorkerInfo
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerInfo {
pub id: String,
pub name: Option<String>,
pub runtime: Option<String>,
pub version: Option<String>,
pub os: Option<String>,
pub ip_address: Option<String>,
pub status: String,
pub connected_at_ms: u64,
pub function_count: usize,
pub functions: Vec<String>,
pub active_invocations: usize,
}
IIIError
pub enum IIIError {
NotConnected,
Timeout,
SerializationError(serde_json::Error),
InvocationError(String),
Other(String),
}
Examples
Basic HTTP API
use iii_sdk::{III, Value};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let iii = III::new("ws://127.0.0.1:49134");
iii.connect().await?;
iii.register_function("api.hello", |_input: Value| async move {
Ok(json!({
"status_code": 200,
"body": { "message": "Hello, world!" }
}))
});
iii.register_trigger(
"http",
"api.hello",
json!({ "api_path": "/hello", "http_method": "GET" })
)?;
// Keep running
tokio::signal::ctrl_c().await?;
iii.shutdown_async().await;
Ok(())
}
Calling Other Functions
use serde_json::{json, Value};
iii.register_function("process.order", |order: Value| async move {
// Call payment service
let payment = iii.call(
"payment.charge",
json!({
"amount": order["total"],
"customer": order["customerId"],
})
).await?;
// Notify warehouse asynchronously
iii.call_void("warehouse.ship", json!({ "orderId": order["id"] }))?;
Ok(json!({
"success": true,
"paymentId": payment["id"]
}))
});
Error Handling
use iii_sdk::IIIError;
iii.register_function("my.function", |input: Value| async move {
if input.get("error").is_some() {
return Err(IIIError::Other("Invalid input".to_string()));
}
Ok(json!({ "result": "success" }))
});
OpenTelemetry (with otel feature)
When the otel feature is enabled, the SDK provides additional tracing capabilities:
[dependencies]
iii-sdk = { version = "0.1", features = ["otel"] }
init_otel()
Initialize OpenTelemetry (called automatically by connect()).
#[cfg(feature = "otel")]
use iii_sdk::{init_otel, OtelConfig};
#[cfg(feature = "otel")]
init_otel(OtelConfig {
enabled: true,
service_name: Some("my-service".to_string()),
..Default::default()
}).await;
with_span()
Execute code within an OpenTelemetry span.
#[cfg(feature = "otel")]
use iii_sdk::{with_span, SpanKind};
#[cfg(feature = "otel")]
let result = with_span("my-operation", SpanKind::Internal, |span| async move {
span.set_attribute("custom.key", "value");
// Your code here
Ok(json!({ "result": "success" }))
}).await?;