Utilities
Motia exports several utility functions and helpers for advanced use cases in Python.
Type guards
Trigger type guards
Type guards for checking trigger types:
from motia import is_api_trigger, is_queue_trigger, is_cron_trigger, is_state_trigger, is_stream_trigger
from motia.types import TriggerConfig
trigger: TriggerConfig = {
"type": "http",
"method": "GET",
"path": "/users",
}
if is_api_trigger(trigger):
print(f"{trigger['method']} {trigger['path']}")
is_api_trigger
(trigger: TriggerConfig) -> bool
Returns True if the trigger is an HTTP/API trigger
is_queue_trigger
(trigger: TriggerConfig) -> bool
Returns True if the trigger is a queue trigger
is_cron_trigger
(trigger: TriggerConfig) -> bool
Returns True if the trigger is a cron trigger
is_state_trigger
(trigger: TriggerConfig) -> bool
Returns True if the trigger is a state trigger
is_stream_trigger
(trigger: TriggerConfig) -> bool
Returns True if the trigger is a stream trigger
Step type guards
Type guards for checking if a Step has specific trigger types:
from motia import is_api_step, is_queue_step, is_cron_step, is_state_step, is_stream_step
from motia.types import Step
step: Step = {
"filePath": "steps/hello_step.py",
"config": {
"name": "Hello",
"triggers": [
{"type": "http", "method": "GET", "path": "/hello"}
],
},
}
if is_api_step(step):
print("This step has HTTP triggers")
Returns True if the step has at least one HTTP trigger
Returns True if the step has at least one queue trigger
Returns True if the step has at least one cron trigger
Returns True if the step has at least one state trigger
Returns True if the step has at least one stream trigger
Trigger getters
Functions to extract specific trigger types from a Step:
from motia import get_api_triggers, get_queue_triggers
from motia.types import Step
step: Step = {
"filePath": "steps/multi_step.py",
"config": {
"name": "MultiTrigger",
"triggers": [
{"type": "http", "method": "POST", "path": "/orders"},
{"type": "queue", "topic": "order.created"},
],
},
}
api_triggers = get_api_triggers(step)
print(api_triggers[0]["method"]) # 'POST'
queue_triggers = get_queue_triggers(step)
print(queue_triggers[0]["topic"]) # 'order.created'
get_api_triggers
(step: Step) -> list[ApiTrigger]
Returns all HTTP triggers from a step
get_queue_triggers
(step: Step) -> list[QueueTrigger]
Returns all queue triggers from a step
get_cron_triggers
(step: Step) -> list[CronTrigger]
Returns all cron triggers from a step
get_state_triggers
(step: Step) -> list[StateTrigger]
Returns all state triggers from a step
get_stream_triggers
(step: Step) -> list[StreamTrigger]
Returns all stream triggers from a step
iii SDK utilities
ChannelReader and ChannelWriter
Low-level streaming interfaces for request/response bodies:
from motia import ChannelReader, ChannelWriter
async def handler(args, ctx):
# Read request body as stream
reader: ChannelReader = args["request"].request_body
# Write response as stream
writer: ChannelWriter = args["response"].stream
args["response"].status(200)
args["response"].headers({"Content-Type": "text/plain"})
# Stream response
async for chunk in reader:
await writer.write(chunk)
args["response"].close()
ChannelReader and ChannelWriter are advanced APIs for streaming. Most use cases should use the simpler request.body and response APIs.
Runtime utilities
init_iii
Initialize the iii engine connection:
from motia import init_iii
await init_iii()
Initializes the connection to the iii engine. Automatically called by the framework.
get_instance
Get the iii engine client instance:
from motia import get_instance
client = get_instance()
Returns the iii engine client instance for advanced use cases.
generate_step_id
Generate a deterministic ID for a step based on its file path:
from motia import generate_step_id
step_id = generate_step_id("steps/hello_step.py")
print(step_id) # Deterministic hash
Generates a unique, deterministic ID for a step file.
setup_step_endpoint
Low-level function to register a step with the iii engine:
from motia import setup_step_endpoint
async def my_handler(input, ctx):
return {"status": 200, "body": {"message": "Hello"}}
await setup_step_endpoint(
file_path="steps/hello_step.py",
config={
"name": "Hello",
"triggers": [{"type": "http", "method": "GET", "path": "/hello"}],
},
handler=my_handler,
)
This is a low-level API. Most applications should use the file-based Step discovery instead.
Motia class
The core runtime class:
from motia import Motia
motia = Motia()
The Motia class is used internally by the framework. Most applications don’t need to instantiate it directly.
stateManager
Global state manager instance:
from motia import stateManager
# Access state outside of handlers (not recommended)
value = await stateManager.get("users", "user-123")
Prefer using ctx.state inside handlers instead of the global stateManager instance.
schema_to_json_schema
Convert Pydantic models to JSON Schema:
from motia import schema_to_json_schema
from pydantic import BaseModel
class User(BaseModel):
name: str
email: str
json_schema = schema_to_json_schema(User)
schema_to_json_schema
(schema: type[BaseModel]) -> dict
Converts a Pydantic model to JSON Schema format for API validation.
Tracing
The tracing module provides OpenTelemetry integration:
from motia import tracing
# Configure tracing
tracing.init(
service_name="my-service",
otlp_endpoint="http://localhost:4318",
)
See Observability for more details on tracing configuration.
Step builders
StepBuilder and StepDefinition
Builder pattern for creating steps:
from motia import StepBuilder, StepDefinition
from motia.triggers import http
step_def: StepDefinition = (
StepBuilder()
.name("Hello")
.trigger(http(method="GET", path="/hello"))
.handler(lambda req, ctx: {"status": 200, "body": {"message": "Hello"}})
.build()
)
MultiTriggerStepBuilder
Builder for multi-trigger steps:
from motia import MultiTriggerStepBuilder, multi_trigger_step
from motia.triggers import http, queue
step = multi_trigger_step(
name="ProcessOrder",
triggers=[
http(method="POST", path="/orders"),
queue(topic="order.created"),
],
handler=lambda input, ctx: ctx.match(
http=lambda req: {"status": 200, "body": {"ok": True}},
queue=lambda data: ctx.logger.info("Processing order", data),
),
)
Next steps
Context API
Learn about the FlowContext API
Types
Explore all Python types
Step API
Create steps with the step() function
State manager
Manage application state