Documentation Index Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Installation
Quick Start
import asyncio
from iii import III
async def my_function ( data ):
return { "result" : "success" }
iii = III( "ws://localhost:49134" )
iii.register_function( "my.function" , my_function)
async def main ():
await iii.connect()
result = await iii.call( "my.function" , { "param" : "value" })
print (result)
asyncio.run(main())
Initialization
III()
Create a new III SDK instance.
WebSocket address of the III Engine (e.g., ws://localhost:49134)
Optional configuration options Custom worker name (defaults to hostname:pid)
Enable automatic metrics reporting to the engine
Default timeout for function invocations in milliseconds
WebSocket reconnection behavior configuration Initial delay before first reconnection attempt
Maximum delay between reconnection attempts
Exponential backoff multiplier
Random jitter factor (0-1)
Maximum retry attempts (-1 for infinite)
Additional telemetry metadata Project name for analytics
Framework name (e.g., “fastapi”, “django”)
Amplitude API key for analytics
from iii import III , InitOptions, TelemetryOptions
iii = III(
"ws://localhost:49134" ,
InitOptions(
worker_name = "my-worker" ,
invocation_timeout_ms = 60000 ,
telemetry = TelemetryOptions(
language = "en" ,
project_name = "my-project" ,
framework = "fastapi"
)
)
)
connect()
Connect to the III Engine.
Function Registration
register_function()
Register a function that can be invoked by other services or triggers.
Unique function identifier (use :: for namespacing, e.g., external::my_lambda)
handler_or_invocation
callable | HttpInvocationConfig
required
Function handler (async function) or HTTP invocation config (external HTTP endpoint)
Human-readable function description
Additional metadata for the function
Function reference with id and unregister() method
# Local handler
async def my_function ( data ):
return { "result" : "success" }
ref = iii.register_function( "my.function" , my_function, description = "My function" )
# HTTP invocation (external endpoint)
from iii import HttpInvocationConfig
iii.register_function(
"external::lambda" ,
HttpInvocationConfig(
url = "https://my-lambda.example.com/invoke" ,
method = "POST" ,
timeout_ms = 10000 ,
headers = { "X-Custom-Header" : "value" },
),
description = "External Lambda function"
)
# Unregister later
ref.unregister()
Function Invocation
call()
Invoke a function synchronously and wait for the result.
ID of the function to invoke
Input data to pass to the function
result = await iii.call( "my.function" , { "param" : "value" })
print (result)
# With custom timeout
result = await iii.call( "slow.function" , { "data" : "x" }, timeout = 60.0 )
call_void()
Invoke a function asynchronously without waiting for a result (fire-and-forget).
ID of the function to invoke
Input data to pass to the function
iii.call_void( "my.function" , { "param" : "value" })
# Function is invoked in the background, no result is returned
trigger()
Alias for call(). Invokes a function synchronously.
result = await iii.trigger( "my.function" , { "param" : "value" })
trigger_void()
Alias for call_void(). Invokes a function asynchronously.
iii.trigger_void( "my.function" , { "param" : "value" })
Trigger Management
register_trigger()
Register a trigger to automatically invoke a function based on events.
Trigger type (e.g., http, schedule, custom types)
ID of the function to invoke when triggered
Trigger-specific configuration (depends on trigger type)
Trigger object with unregister() method
# HTTP trigger
trigger = iii.register_trigger(
type = "http" ,
function_id = "my.function" ,
config = {
"api_path" : "/hello" ,
"http_method" : "POST" ,
}
)
# Unregister later
trigger.unregister()
register_trigger_type()
Register a custom trigger type with custom logic.
Unique trigger type identifier
Human-readable description
Trigger type handler with register_trigger and unregister_trigger methods
from iii import TriggerHandler, TriggerConfig
class MyTriggerHandler ( TriggerHandler ):
async def register_trigger ( self , config : TriggerConfig):
# Setup logic when a trigger of this type is registered
print ( f "Trigger registered: { config.id } " )
async def unregister_trigger ( self , config : TriggerConfig):
# Cleanup logic when a trigger is unregistered
print ( f "Trigger unregistered: { config.id } " )
iii.register_trigger_type(
"myTriggerType" ,
"My custom trigger type" ,
MyTriggerHandler()
)
unregister_trigger_type()
Unregister a custom trigger type.
Trigger type ID to unregister
iii.unregister_trigger_type( "myTriggerType" )
Service Registration
register_service()
Register a service for grouping related functions.
Parent service ID for hierarchical organization
iii.register_service(
"my-service" ,
description = "My service description"
)
Engine Functions
list_functions()
List all registered functions in the engine.
List of function information objects
functions = await iii.list_functions()
for fn in functions:
print (fn.function_id, fn.description)
list_workers()
List all connected workers in the engine.
List of worker information objects
workers = await iii.list_workers()
for worker in workers:
print (worker.name, worker.status, worker.function_count)
Channels
create_channel()
Create a streaming channel pair for worker-to-worker data transfer.
Optional buffer size for the channel (defaults to 64)
Channel object with writer, reader, writer_ref, and reader_ref
channel = await iii.create_channel( buffer_size = 128 )
# Write data
await channel.writer.write({ "data" : "hello" })
await channel.writer.close()
# Read data
async for item in channel.reader:
print (item)
# Pass channel refs to other functions
await iii.call( "processData" , {
"input" : channel.reader_ref,
"output" : channel.writer_ref,
})
Event Listeners
on_functions_available()
Subscribe to function availability events.
Callback invoked when functions are registered/unregistered def callback ( functions : list[FunctionInfo]) -> None :
...
def on_functions_changed ( functions ):
print ( f "Available functions: { [f.function_id for f in functions] } " )
unsubscribe = iii.on_functions_available(on_functions_changed)
# Unsubscribe later
unsubscribe()
on_connection_state_change()
Monitor WebSocket connection state changes.
Callback invoked when connection state changes def callback ( state : str ) -> None :
...
def on_state_change ( state ):
print ( f "Connection state: { state } " )
unsubscribe = iii.on_connection_state_change(on_state_change)
# Unsubscribe later
unsubscribe()
get_connection_state()
Get the current connection state.
Current state: disconnected, connecting, connected, reconnecting, or failed
state = iii.get_connection_state()
print ( f "Current state: { state } " )
Context & Logging
get_context()
Get the current execution context (available within function handlers).
Context object with logger
from iii import get_context
async def my_function ( data ):
ctx = get_context()
ctx.logger.info( "Processing request" , extra = { "input" : data})
ctx.logger.warning( "Something might be wrong" )
ctx.logger.error( "An error occurred" , extra = { "error" : "details" })
return { "result" : "success" }
iii.register_function( "my.function" , my_function)
with_context()
Run a function with a custom context.
Context to use during execution
from iii import with_context, Context, Logger
logger = Logger( function_name = "my-function" )
ctx = Context( logger = logger)
await with_context(
lambda _ : some_function(),
ctx
)
Lifecycle
shutdown()
Gracefully shut down the SDK, cleaning up all resources.
Properties
worker_id
Get the worker ID assigned by the engine.
Worker ID or None if not yet registered
worker_id = iii.worker_id
print ( f "Worker ID: { worker_id } " )
Types
FunctionInfo
from dataclasses import dataclass
@dataclass
class FunctionInfo :
function_id: str
description: str | None = None
request_format: dict | None = None
response_format: dict | None = None
metadata: dict | None = None
WorkerInfo
@dataclass
class WorkerInfo :
id : str
name: str | None
runtime: str | None
version: str | None
os: str | None
ip_address: str | None
status: str
connected_at_ms: int
function_count: int
functions: list[ str ]
active_invocations: int
ApiRequest / ApiResponse
from iii import ApiRequest, ApiResponse
@dataclass
class ApiRequest :
path_params: dict[ str , str ]
query_params: dict[ str , str | list[ str ]]
body: Any
headers: dict[ str , str | list[ str ]]
method: str
request_body: ChannelReader
@dataclass
class ApiResponse :
status: int
data: Any = None
headers: dict[ str , str ] | None = None
Examples
Basic HTTP API
import asyncio
from iii import III , ApiRequest, ApiResponse
iii = III( "ws://localhost:49134" )
async def hello ( data ):
req = ApiRequest( ** data)
return ApiResponse( status = 200 , data = { "message" : "Hello, world!" })
iii.register_function( "api.hello" , hello)
async def main ():
await iii.connect()
iii.register_trigger(
type = "http" ,
function_id = "api.hello" ,
config = { "api_path" : "/hello" , "http_method" : "GET" }
)
# Keep running
await asyncio.Future()
asyncio.run(main())
Calling Other Functions
async def process_order ( order ):
# Call payment service
payment = await iii.call( "payment.charge" , {
"amount" : order[ "total" ],
"customer" : order[ "customerId" ],
})
# Notify warehouse asynchronously
iii.call_void( "warehouse.ship" , { "orderId" : order[ "id" ]})
return { "success" : True , "paymentId" : payment[ "id" ]}
iii.register_function( "processOrder" , process_order)
Streaming Data with Channels
async def process_stream ( req ):
channel = await iii.create_channel()
# Process in background
iii.call_void( "processData" , { "input" : channel.reader_ref})
# Write data
for i in range ( 100 ):
await channel.writer.write({ "value" : i})
await channel.writer.close()
return { "status" : "processing" }
iii.register_function( "processStream" , process_stream)