Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt

Use this file to discover all available pages before exploring further.

Installation

pip install iii-sdk

Quick Start

import asyncio
from iii import III

async def my_function(data):
    return {"result": "success"}

iii = III("ws://localhost:49134")
iii.register_function("my.function", my_function)

async def main():
    await iii.connect()
    result = await iii.call("my.function", {"param": "value"})
    print(result)

asyncio.run(main())

Initialization

III()

Create a new III SDK instance.
address
str
required
WebSocket address of the III Engine (e.g., ws://localhost:49134)
options
InitOptions
Optional configuration options
from iii import III, InitOptions, TelemetryOptions

iii = III(
    "ws://localhost:49134",
    InitOptions(
        worker_name="my-worker",
        invocation_timeout_ms=60000,
        telemetry=TelemetryOptions(
            language="en",
            project_name="my-project",
            framework="fastapi"
        )
    )
)

connect()

Connect to the III Engine.
await iii.connect()

Function Registration

register_function()

Register a function that can be invoked by other services or triggers.
path
str
required
Unique function identifier (use :: for namespacing, e.g., external::my_lambda)
handler_or_invocation
callable | HttpInvocationConfig
required
Function handler (async function) or HTTP invocation config (external HTTP endpoint)
description
str
Human-readable function description
metadata
dict
Additional metadata for the function
return
FunctionRef
Function reference with id and unregister() method
# Local handler
async def my_function(data):
    return {"result": "success"}

ref = iii.register_function("my.function", my_function, description="My function")

# HTTP invocation (external endpoint)
from iii import HttpInvocationConfig

iii.register_function(
    "external::lambda",
    HttpInvocationConfig(
        url="https://my-lambda.example.com/invoke",
        method="POST",
        timeout_ms=10000,
        headers={"X-Custom-Header": "value"},
    ),
    description="External Lambda function"
)

# Unregister later
ref.unregister()

Function Invocation

call()

Invoke a function synchronously and wait for the result.
path
str
required
ID of the function to invoke
data
any
required
Input data to pass to the function
timeout
float
default:"30.0"
Timeout in seconds
return
Any
Function result
result = await iii.call("my.function", {"param": "value"})
print(result)

# With custom timeout
result = await iii.call("slow.function", {"data": "x"}, timeout=60.0)

call_void()

Invoke a function asynchronously without waiting for a result (fire-and-forget).
path
str
required
ID of the function to invoke
data
any
required
Input data to pass to the function
iii.call_void("my.function", {"param": "value"})
# Function is invoked in the background, no result is returned

trigger()

Alias for call(). Invokes a function synchronously.
result = await iii.trigger("my.function", {"param": "value"})

trigger_void()

Alias for call_void(). Invokes a function asynchronously.
iii.trigger_void("my.function", {"param": "value"})

Trigger Management

register_trigger()

Register a trigger to automatically invoke a function based on events.
type
str
required
Trigger type (e.g., http, schedule, custom types)
function_id
str
required
ID of the function to invoke when triggered
config
dict
required
Trigger-specific configuration (depends on trigger type)
return
Trigger
Trigger object with unregister() method
# HTTP trigger
trigger = iii.register_trigger(
    type="http",
    function_id="my.function",
    config={
        "api_path": "/hello",
        "http_method": "POST",
    }
)

# Unregister later
trigger.unregister()

register_trigger_type()

Register a custom trigger type with custom logic.
id
str
required
Unique trigger type identifier
description
str
required
Human-readable description
handler
TriggerHandler
required
Trigger type handler with register_trigger and unregister_trigger methods
from iii import TriggerHandler, TriggerConfig

class MyTriggerHandler(TriggerHandler):
    async def register_trigger(self, config: TriggerConfig):
        # Setup logic when a trigger of this type is registered
        print(f"Trigger registered: {config.id}")
    
    async def unregister_trigger(self, config: TriggerConfig):
        # Cleanup logic when a trigger is unregistered
        print(f"Trigger unregistered: {config.id}")

iii.register_trigger_type(
    "myTriggerType",
    "My custom trigger type",
    MyTriggerHandler()
)

unregister_trigger_type()

Unregister a custom trigger type.
id
str
required
Trigger type ID to unregister
iii.unregister_trigger_type("myTriggerType")

Service Registration

register_service()

Register a service for grouping related functions.
id
str
required
Service identifier
description
str
Service description
parent_id
str
Parent service ID for hierarchical organization
iii.register_service(
    "my-service",
    description="My service description"
)

Engine Functions

list_functions()

List all registered functions in the engine.
return
list[FunctionInfo]
List of function information objects
functions = await iii.list_functions()
for fn in functions:
    print(fn.function_id, fn.description)

list_workers()

List all connected workers in the engine.
return
list[WorkerInfo]
List of worker information objects
workers = await iii.list_workers()
for worker in workers:
    print(worker.name, worker.status, worker.function_count)

Channels

create_channel()

Create a streaming channel pair for worker-to-worker data transfer.
buffer_size
int
Optional buffer size for the channel (defaults to 64)
return
Channel
Channel object with writer, reader, writer_ref, and reader_ref
channel = await iii.create_channel(buffer_size=128)

# Write data
await channel.writer.write({"data": "hello"})
await channel.writer.close()

# Read data
async for item in channel.reader:
    print(item)

# Pass channel refs to other functions
await iii.call("processData", {
    "input": channel.reader_ref,
    "output": channel.writer_ref,
})

Event Listeners

on_functions_available()

Subscribe to function availability events.
callback
callable
required
Callback invoked when functions are registered/unregistered
def callback(functions: list[FunctionInfo]) -> None:
    ...
return
callable
Unsubscribe function
def on_functions_changed(functions):
    print(f"Available functions: {[f.function_id for f in functions]}")

unsubscribe = iii.on_functions_available(on_functions_changed)

# Unsubscribe later
unsubscribe()

on_connection_state_change()

Monitor WebSocket connection state changes.
callback
callable
required
Callback invoked when connection state changes
def callback(state: str) -> None:
    ...
return
callable
Unsubscribe function
def on_state_change(state):
    print(f"Connection state: {state}")

unsubscribe = iii.on_connection_state_change(on_state_change)

# Unsubscribe later
unsubscribe()

get_connection_state()

Get the current connection state.
return
str
Current state: disconnected, connecting, connected, reconnecting, or failed
state = iii.get_connection_state()
print(f"Current state: {state}")

Context & Logging

get_context()

Get the current execution context (available within function handlers).
return
Context
Context object with logger
from iii import get_context

async def my_function(data):
    ctx = get_context()
    
    ctx.logger.info("Processing request", extra={"input": data})
    ctx.logger.warning("Something might be wrong")
    ctx.logger.error("An error occurred", extra={"error": "details"})
    
    return {"result": "success"}

iii.register_function("my.function", my_function)

with_context()

Run a function with a custom context.
fn
callable
required
Function to execute
context
Context
required
Context to use during execution
from iii import with_context, Context, Logger

logger = Logger(function_name="my-function")
ctx = Context(logger=logger)

await with_context(
    lambda _: some_function(),
    ctx
)

Lifecycle

shutdown()

Gracefully shut down the SDK, cleaning up all resources.
await iii.shutdown()

Properties

worker_id

Get the worker ID assigned by the engine.
return
str | None
Worker ID or None if not yet registered
worker_id = iii.worker_id
print(f"Worker ID: {worker_id}")

Types

FunctionInfo

from dataclasses import dataclass

@dataclass
class FunctionInfo:
    function_id: str
    description: str | None = None
    request_format: dict | None = None
    response_format: dict | None = None
    metadata: dict | None = None

WorkerInfo

@dataclass
class WorkerInfo:
    id: str
    name: str | None
    runtime: str | None
    version: str | None
    os: str | None
    ip_address: str | None
    status: str
    connected_at_ms: int
    function_count: int
    functions: list[str]
    active_invocations: int

ApiRequest / ApiResponse

from iii import ApiRequest, ApiResponse

@dataclass
class ApiRequest:
    path_params: dict[str, str]
    query_params: dict[str, str | list[str]]
    body: Any
    headers: dict[str, str | list[str]]
    method: str
    request_body: ChannelReader

@dataclass
class ApiResponse:
    status: int
    data: Any = None
    headers: dict[str, str] | None = None

Examples

Basic HTTP API

import asyncio
from iii import III, ApiRequest, ApiResponse

iii = III("ws://localhost:49134")

async def hello(data):
    req = ApiRequest(**data)
    return ApiResponse(status=200, data={"message": "Hello, world!"})

iii.register_function("api.hello", hello)

async def main():
    await iii.connect()
    
    iii.register_trigger(
        type="http",
        function_id="api.hello",
        config={"api_path": "/hello", "http_method": "GET"}
    )
    
    # Keep running
    await asyncio.Future()

asyncio.run(main())

Calling Other Functions

async def process_order(order):
    # Call payment service
    payment = await iii.call("payment.charge", {
        "amount": order["total"],
        "customer": order["customerId"],
    })
    
    # Notify warehouse asynchronously
    iii.call_void("warehouse.ship", {"orderId": order["id"]})
    
    return {"success": True, "paymentId": payment["id"]}

iii.register_function("processOrder", process_order)

Streaming Data with Channels

async def process_stream(req):
    channel = await iii.create_channel()
    
    # Process in background
    iii.call_void("processData", {"input": channel.reader_ref})
    
    # Write data
    for i in range(100):
        await channel.writer.write({"value": i})
    await channel.writer.close()
    
    return {"status": "processing"}

iii.register_function("processStream", process_stream)

Build docs developers (and LLMs) love