Motia provides comprehensive type definitions for building type-safe workflow steps. All types are Pydantic models with full validation support.
HTTP types
ApiRequest
Represents an incoming HTTP API request.
class ApiRequest(BaseModel, Generic[TBody]):
path_params: dict[str, str]
query_params: dict[str, str | list[str]]
body: TBody | None
headers: dict[str, str | list[str]]
path_params
dict[str, str]
default:"{}"
Path parameters extracted from the URL pattern (e.g., {"id": "123"} from /users/:id).
query_params
dict[str, str | list[str]]
default:"{}"
Query string parameters. Values can be strings or lists for repeated params.
Parsed request body. Type is generic and can be constrained with Pydantic models.
HTTP headers. Values can be strings or lists for repeated headers.
Example
from motia import ApiRequest, ApiResponse, FlowContext, http, step
from pydantic import BaseModel
class CreateUserBody(BaseModel):
name: str
email: str
my_step = step({
"name": "CreateUser",
"triggers": [http("POST", "/users/:org_id")],
})
async def handler(request: ApiRequest[CreateUserBody], ctx: FlowContext) -> ApiResponse:
org_id = request.path_params.get("org_id")
user_role = request.query_params.get("role", "member")
auth_header = request.headers.get("authorization")
# Type-safe body access
name = request.body.name if request.body else "Unknown"
email = request.body.email if request.body else ""
ctx.logger.info(f"Creating user {name} in org {org_id} with role {user_role}")
return ApiResponse(status=201, body={"id": "user-123", "name": name})
ApiResponse
Represents an HTTP response returned from handlers.
class ApiResponse(BaseModel, Generic[TOutput]):
status: int
body: Any
headers: dict[str, str]
HTTP status code (e.g., 200, 201, 400, 404, 500).
Response body. Must be JSON-serializable.
Example
from motia import ApiResponse
# Success response
return ApiResponse(
status=200,
body={"message": "Success", "data": {"id": "123"}},
headers={"x-request-id": "abc123"},
)
# Error response
return ApiResponse(
status=400,
body={"error": "Invalid input"},
)
# No content
return ApiResponse(status=204, body=None)
MotiaHttpRequest
HTTP request for streaming API triggers with channel support.
class MotiaHttpRequest(BaseModel, Generic[TBody]):
path_params: dict[str, str]
query_params: dict[str, str | list[str]]
body: TBody | None
headers: dict[str, str | list[str]]
method: str
request_body: Any # ChannelReader
Similar to ApiRequest but includes method and request_body for streaming use cases.
MotiaHttpResponse
Streaming HTTP response for channel-based API triggers.
class MotiaHttpResponse:
async def status(self, status_code: int) -> None
async def headers(self, headers: dict[str, str]) -> None
@property
def writer(self) -> Any # ChannelWriter
def close(self) -> None
Set the HTTP status code.
Channel writer for streaming response body.
Close the response channel.
MotiaHttpArgs
Combines request and response for streaming handlers.
class MotiaHttpArgs(BaseModel, Generic[TBody]):
request: MotiaHttpRequest[TBody]
response: MotiaHttpResponse
Provides backward-compatible property accessors for path_params, query_params, body, headers, method, and request_body.
QueryParam
Defines a query parameter for API documentation.
class QueryParam(BaseModel):
name: str
description: str
Example
from motia import http, QueryParam
trigger = http(
"GET",
"/users",
query_params=[
QueryParam(name="page", description="Page number for pagination"),
QueryParam(name="limit", description="Number of items per page"),
QueryParam(name="role", description="Filter by user role"),
],
)
ApiRouteMethod
HTTP method type alias.
ApiRouteMethod = Literal["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
Trigger types
ApiTrigger
HTTP API trigger configuration.
class ApiTrigger(BaseModel):
type: Literal["http"]
path: str
method: ApiRouteMethod
condition: TriggerCondition | None
middleware: list[ApiMiddleware] | None
body_schema: Any | None
response_schema: dict[int, Any] | None
query_params: list[QueryParam] | None
See http() for usage details.
QueueTrigger
Queue message trigger configuration.
class QueueTrigger(BaseModel):
type: Literal["queue"]
topic: str
condition: TriggerCondition | None
input: Any | None
infrastructure: InfrastructureConfig | None
See queue() for usage details.
CronTrigger
Scheduled cron trigger configuration.
class CronTrigger(BaseModel):
type: Literal["cron"]
expression: str
condition: TriggerCondition | None
See cron() for usage details.
StateTrigger
State change trigger configuration.
class StateTrigger(BaseModel):
type: Literal["state"]
condition: TriggerCondition | None
See state() for usage details.
StreamTrigger
Stream event trigger configuration.
class StreamTrigger(BaseModel):
type: Literal["stream"]
stream_name: str
group_id: str | None
item_id: str | None
condition: TriggerCondition | None
See stream() for usage details.
TriggerConfig
Union type for all trigger configurations.
TriggerConfig = QueueTrigger | ApiTrigger | CronTrigger | StateTrigger | StreamTrigger
TriggerCondition
Type for trigger condition functions.
TriggerCondition = Callable[[Any, FlowContext], bool | Awaitable[bool]]
Example
from motia import FlowContext, ApiRequest
# Synchronous condition
def is_admin(request: ApiRequest, ctx: FlowContext) -> bool:
return request.headers.get("x-user-role") == "admin"
# Async condition
async def has_permission(request: ApiRequest, ctx: FlowContext) -> bool:
user_id = request.headers.get("x-user-id")
user = await ctx.state.get("users", user_id)
return user and user.get("permissions", {}).get("admin", False)
TriggerInfo
Information about the trigger that fired.
class TriggerInfo(BaseModel):
type: Literal["http", "queue", "cron", "state", "stream"]
index: int | None
# HTTP-specific
path: str | None
method: str | None
# Queue-specific
topic: str | None
# Cron-specific
expression: str | None
Available via ctx.trigger in handlers.
Input received when a state trigger fires.
class StateTriggerInput(BaseModel):
type: Literal["state"]
group_id: str
item_id: str
old_value: Any | None
new_value: Any | None
The state scope that changed.
The key within the scope that changed.
Previous value before the change.
New value after the change.
Input received when a stream trigger fires.
class StreamTriggerInput(BaseModel):
type: Literal["stream"]
timestamp: int
stream_name: str
group_id: str
id: str
event: StreamEvent
Unix timestamp in milliseconds when the event occurred.
Name of the stream that fired the event.
The group where the event occurred.
The item ID that changed.
Event details with type and data.
StreamEvent
Event data from a stream operation.
class StreamEvent(BaseModel):
type: Literal["create", "update", "delete"]
data: Any
type
'create' | 'update' | 'delete'
The type of stream operation that occurred.
The item data associated with the event.
Configuration types
StepConfig
Complete step configuration.
class StepConfig(BaseModel):
name: str
triggers: list[TriggerConfig]
enqueues: list[str | Enqueue]
virtual_enqueues: list[str | Enqueue] | None
virtual_subscribes: list[str] | None
description: str | None
flows: list[str] | None
include_files: list[str] | None
infrastructure: InfrastructureConfig | None
See step() for usage details.
Enqueue
Enqueue configuration with options.
class Enqueue(BaseModel):
topic: str
label: str | None
conditional: bool
If true, only used when explicitly called.
InfrastructureConfig
Infrastructure configuration for handlers and queues.
class InfrastructureConfig(BaseModel):
handler: HandlerConfig | None
queue: QueueConfig | None
HandlerConfig
Handler compute resource configuration.
class HandlerConfig(BaseModel):
ram: int # Default: 128 MB
cpu: int | None
timeout: int # Default: 30 seconds
QueueConfig
Queue behavior configuration.
class QueueConfig(BaseModel):
type: Literal["fifo", "standard"] # Default: "standard"
max_retries: int # Default: 3
visibility_timeout: int # Default: 30 seconds
delay_seconds: int # Default: 0
Stream types
StreamConfig
Stream configuration with event handlers.
class StreamConfig(BaseModel):
name: str
description: str | None
schema: dict[str, Any] | None
base_config: dict[str, Any] | None
on_join: Callable[[StreamSubscription, FlowContext, Any], Awaitable[Any]] | None
on_leave: Callable[[StreamSubscription, FlowContext, Any], Awaitable[Any]] | None
See Stream API for usage details.
StreamSubscription
Stream subscription details.
class StreamSubscription(BaseModel):
group_id: str
id: str | None
Authentication input for stream connections.
class StreamAuthInput(BaseModel):
headers: dict[str, str]
path: str
query_params: dict[str, list[str]]
addr: str
StreamAuthResult
Authentication result for stream connections.
class StreamAuthResult(BaseModel):
authorized: bool
context: Any | None
Step types
Step
Represents a step in a flow.
class Step(BaseModel):
file_path: str
config: StepConfig
StepDefinition
Complete step definition with handler.
@dataclass
class StepDefinition:
config: StepConfig
handler: StepHandler
Returned by step() when a handler is provided.
StepBuilder
Builder for creating step definitions.
class StepBuilder:
def __init__(self, config: StepConfig) -> None
def handle(self, handler: StepHandler) -> StepDefinition
Returned by step() when no handler is provided.
Callable types
Enqueuer
Function type for enqueueing messages.
Enqueuer = Callable[[Any], Awaitable[None]]
Available as ctx.enqueue.
ApiMiddleware
Middleware function type for HTTP triggers.
ApiMiddleware = Callable[
[ApiRequest[Any], FlowContext[Any], Callable[[], Awaitable[ApiResponse[Any]]]],
Awaitable[ApiResponse[Any]],
]
Example
from motia import ApiRequest, ApiResponse, FlowContext, ApiMiddleware
import time
async def timing_middleware(
request: ApiRequest,
ctx: FlowContext,
next: Callable[[], Awaitable[ApiResponse]],
) -> ApiResponse:
"""Measure request processing time."""
start = time.time()
response = await next()
duration = time.time() - start
ctx.logger.info(f"Request took {duration:.3f}s")
response.headers["x-duration-ms"] = str(int(duration * 1000))
return response
Type variables
Motia uses generic type variables for type safety:
TInput = TypeVar("TInput") # Input type
TOutput = TypeVar("TOutput") # Output type
TBody = TypeVar("TBody") # Request body type
TData = TypeVar("TData") # Stream/state data type
TEnqueueData = TypeVar("TEnqueueData") # Enqueue data type
Example with type parameters
from motia import ApiRequest, ApiResponse, FlowContext, step, http
from pydantic import BaseModel
class CreateOrderRequest(BaseModel):
description: str
amount: float
class OrderResponse(BaseModel):
id: str
status: str
my_step = step({
"name": "CreateOrder",
"triggers": [http("POST", "/orders")],
})
async def handler(
request: ApiRequest[CreateOrderRequest],
ctx: FlowContext,
) -> ApiResponse[OrderResponse]:
# Type-safe access to request.body
description = request.body.description if request.body else ""
return ApiResponse(
status=201,
body=OrderResponse(id="order-123", status="pending"),
)