Skip to main content
Motia provides comprehensive type definitions for building type-safe workflow steps. All types are Pydantic models with full validation support.

HTTP types

ApiRequest

Represents an incoming HTTP API request.
class ApiRequest(BaseModel, Generic[TBody]):
    path_params: dict[str, str]
    query_params: dict[str, str | list[str]]
    body: TBody | None
    headers: dict[str, str | list[str]]
path_params
dict[str, str]
default:"{}"
Path parameters extracted from the URL pattern (e.g., {"id": "123"} from /users/:id).
query_params
dict[str, str | list[str]]
default:"{}"
Query string parameters. Values can be strings or lists for repeated params.
body
TBody | None
Parsed request body. Type is generic and can be constrained with Pydantic models.
headers
dict[str, str | list[str]]
default:"{}"
HTTP headers. Values can be strings or lists for repeated headers.

Example

from motia import ApiRequest, ApiResponse, FlowContext, http, step
from pydantic import BaseModel

class CreateUserBody(BaseModel):
    name: str
    email: str

my_step = step({
    "name": "CreateUser",
    "triggers": [http("POST", "/users/:org_id")],
})

async def handler(request: ApiRequest[CreateUserBody], ctx: FlowContext) -> ApiResponse:
    org_id = request.path_params.get("org_id")
    user_role = request.query_params.get("role", "member")
    auth_header = request.headers.get("authorization")
    
    # Type-safe body access
    name = request.body.name if request.body else "Unknown"
    email = request.body.email if request.body else ""
    
    ctx.logger.info(f"Creating user {name} in org {org_id} with role {user_role}")
    
    return ApiResponse(status=201, body={"id": "user-123", "name": name})

ApiResponse

Represents an HTTP response returned from handlers.
class ApiResponse(BaseModel, Generic[TOutput]):
    status: int
    body: Any
    headers: dict[str, str]
status
int
required
HTTP status code (e.g., 200, 201, 400, 404, 500).
body
Any
required
Response body. Must be JSON-serializable.
headers
dict[str, str]
default:"{}"
HTTP response headers.

Example

from motia import ApiResponse

# Success response
return ApiResponse(
    status=200,
    body={"message": "Success", "data": {"id": "123"}},
    headers={"x-request-id": "abc123"},
)

# Error response
return ApiResponse(
    status=400,
    body={"error": "Invalid input"},
)

# No content
return ApiResponse(status=204, body=None)

MotiaHttpRequest

HTTP request for streaming API triggers with channel support.
class MotiaHttpRequest(BaseModel, Generic[TBody]):
    path_params: dict[str, str]
    query_params: dict[str, str | list[str]]
    body: TBody | None
    headers: dict[str, str | list[str]]
    method: str
    request_body: Any  # ChannelReader
Similar to ApiRequest but includes method and request_body for streaming use cases.

MotiaHttpResponse

Streaming HTTP response for channel-based API triggers.
class MotiaHttpResponse:
    async def status(self, status_code: int) -> None
    async def headers(self, headers: dict[str, str]) -> None
    @property
    def writer(self) -> Any  # ChannelWriter
    def close(self) -> None
status()
async method
Set the HTTP status code.
headers()
async method
Set response headers.
writer
ChannelWriter
Channel writer for streaming response body.
close()
method
Close the response channel.

MotiaHttpArgs

Combines request and response for streaming handlers.
class MotiaHttpArgs(BaseModel, Generic[TBody]):
    request: MotiaHttpRequest[TBody]
    response: MotiaHttpResponse
Provides backward-compatible property accessors for path_params, query_params, body, headers, method, and request_body.

QueryParam

Defines a query parameter for API documentation.
class QueryParam(BaseModel):
    name: str
    description: str

Example

from motia import http, QueryParam

trigger = http(
    "GET",
    "/users",
    query_params=[
        QueryParam(name="page", description="Page number for pagination"),
        QueryParam(name="limit", description="Number of items per page"),
        QueryParam(name="role", description="Filter by user role"),
    ],
)

ApiRouteMethod

HTTP method type alias.
ApiRouteMethod = Literal["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]

Trigger types

ApiTrigger

HTTP API trigger configuration.
class ApiTrigger(BaseModel):
    type: Literal["http"]
    path: str
    method: ApiRouteMethod
    condition: TriggerCondition | None
    middleware: list[ApiMiddleware] | None
    body_schema: Any | None
    response_schema: dict[int, Any] | None
    query_params: list[QueryParam] | None
See http() for usage details.

QueueTrigger

Queue message trigger configuration.
class QueueTrigger(BaseModel):
    type: Literal["queue"]
    topic: str
    condition: TriggerCondition | None
    input: Any | None
    infrastructure: InfrastructureConfig | None
See queue() for usage details.

CronTrigger

Scheduled cron trigger configuration.
class CronTrigger(BaseModel):
    type: Literal["cron"]
    expression: str
    condition: TriggerCondition | None
See cron() for usage details.

StateTrigger

State change trigger configuration.
class StateTrigger(BaseModel):
    type: Literal["state"]
    condition: TriggerCondition | None
See state() for usage details.

StreamTrigger

Stream event trigger configuration.
class StreamTrigger(BaseModel):
    type: Literal["stream"]
    stream_name: str
    group_id: str | None
    item_id: str | None
    condition: TriggerCondition | None
See stream() for usage details.

TriggerConfig

Union type for all trigger configurations.
TriggerConfig = QueueTrigger | ApiTrigger | CronTrigger | StateTrigger | StreamTrigger

TriggerCondition

Type for trigger condition functions.
TriggerCondition = Callable[[Any, FlowContext], bool | Awaitable[bool]]

Example

from motia import FlowContext, ApiRequest

# Synchronous condition
def is_admin(request: ApiRequest, ctx: FlowContext) -> bool:
    return request.headers.get("x-user-role") == "admin"

# Async condition
async def has_permission(request: ApiRequest, ctx: FlowContext) -> bool:
    user_id = request.headers.get("x-user-id")
    user = await ctx.state.get("users", user_id)
    return user and user.get("permissions", {}).get("admin", False)

TriggerInfo

Information about the trigger that fired.
class TriggerInfo(BaseModel):
    type: Literal["http", "queue", "cron", "state", "stream"]
    index: int | None
    
    # HTTP-specific
    path: str | None
    method: str | None
    
    # Queue-specific
    topic: str | None
    
    # Cron-specific
    expression: str | None
Available via ctx.trigger in handlers.

Trigger input types

StateTriggerInput

Input received when a state trigger fires.
class StateTriggerInput(BaseModel):
    type: Literal["state"]
    group_id: str
    item_id: str
    old_value: Any | None
    new_value: Any | None
group_id
string
The state scope that changed.
item_id
string
The key within the scope that changed.
old_value
Any
Previous value before the change.
new_value
Any
New value after the change.

StreamTriggerInput

Input received when a stream trigger fires.
class StreamTriggerInput(BaseModel):
    type: Literal["stream"]
    timestamp: int
    stream_name: str
    group_id: str
    id: str
    event: StreamEvent
timestamp
int
Unix timestamp in milliseconds when the event occurred.
stream_name
string
Name of the stream that fired the event.
group_id
string
The group where the event occurred.
id
string
The item ID that changed.
event
StreamEvent
Event details with type and data.

StreamEvent

Event data from a stream operation.
class StreamEvent(BaseModel):
    type: Literal["create", "update", "delete"]
    data: Any
type
'create' | 'update' | 'delete'
The type of stream operation that occurred.
data
Any
The item data associated with the event.

Configuration types

StepConfig

Complete step configuration.
class StepConfig(BaseModel):
    name: str
    triggers: list[TriggerConfig]
    enqueues: list[str | Enqueue]
    virtual_enqueues: list[str | Enqueue] | None
    virtual_subscribes: list[str] | None
    description: str | None
    flows: list[str] | None
    include_files: list[str] | None
    infrastructure: InfrastructureConfig | None
See step() for usage details.

Enqueue

Enqueue configuration with options.
class Enqueue(BaseModel):
    topic: str
    label: str | None
    conditional: bool
topic
string
required
Queue topic name.
label
string
Human-readable label.
conditional
bool
default:"false"
If true, only used when explicitly called.

InfrastructureConfig

Infrastructure configuration for handlers and queues.
class InfrastructureConfig(BaseModel):
    handler: HandlerConfig | None
    queue: QueueConfig | None

HandlerConfig

Handler compute resource configuration.
class HandlerConfig(BaseModel):
    ram: int  # Default: 128 MB
    cpu: int | None
    timeout: int  # Default: 30 seconds

QueueConfig

Queue behavior configuration.
class QueueConfig(BaseModel):
    type: Literal["fifo", "standard"]  # Default: "standard"
    max_retries: int  # Default: 3
    visibility_timeout: int  # Default: 30 seconds
    delay_seconds: int  # Default: 0

Stream types

StreamConfig

Stream configuration with event handlers.
class StreamConfig(BaseModel):
    name: str
    description: str | None
    schema: dict[str, Any] | None
    base_config: dict[str, Any] | None
    on_join: Callable[[StreamSubscription, FlowContext, Any], Awaitable[Any]] | None
    on_leave: Callable[[StreamSubscription, FlowContext, Any], Awaitable[Any]] | None
See Stream API for usage details.

StreamSubscription

Stream subscription details.
class StreamSubscription(BaseModel):
    group_id: str
    id: str | None

StreamAuthInput

Authentication input for stream connections.
class StreamAuthInput(BaseModel):
    headers: dict[str, str]
    path: str
    query_params: dict[str, list[str]]
    addr: str

StreamAuthResult

Authentication result for stream connections.
class StreamAuthResult(BaseModel):
    authorized: bool
    context: Any | None

Step types

Step

Represents a step in a flow.
class Step(BaseModel):
    file_path: str
    config: StepConfig

StepDefinition

Complete step definition with handler.
@dataclass
class StepDefinition:
    config: StepConfig
    handler: StepHandler
Returned by step() when a handler is provided.

StepBuilder

Builder for creating step definitions.
class StepBuilder:
    def __init__(self, config: StepConfig) -> None
    def handle(self, handler: StepHandler) -> StepDefinition
Returned by step() when no handler is provided.

Callable types

Enqueuer

Function type for enqueueing messages.
Enqueuer = Callable[[Any], Awaitable[None]]
Available as ctx.enqueue.

ApiMiddleware

Middleware function type for HTTP triggers.
ApiMiddleware = Callable[
    [ApiRequest[Any], FlowContext[Any], Callable[[], Awaitable[ApiResponse[Any]]]],
    Awaitable[ApiResponse[Any]],
]

Example

from motia import ApiRequest, ApiResponse, FlowContext, ApiMiddleware
import time

async def timing_middleware(
    request: ApiRequest,
    ctx: FlowContext,
    next: Callable[[], Awaitable[ApiResponse]],
) -> ApiResponse:
    """Measure request processing time."""
    start = time.time()
    
    response = await next()
    
    duration = time.time() - start
    ctx.logger.info(f"Request took {duration:.3f}s")
    
    response.headers["x-duration-ms"] = str(int(duration * 1000))
    return response

Type variables

Motia uses generic type variables for type safety:
TInput = TypeVar("TInput")        # Input type
TOutput = TypeVar("TOutput")      # Output type
TBody = TypeVar("TBody")          # Request body type
TData = TypeVar("TData")          # Stream/state data type
TEnqueueData = TypeVar("TEnqueueData")  # Enqueue data type

Example with type parameters

from motia import ApiRequest, ApiResponse, FlowContext, step, http
from pydantic import BaseModel

class CreateOrderRequest(BaseModel):
    description: str
    amount: float

class OrderResponse(BaseModel):
    id: str
    status: str

my_step = step({
    "name": "CreateOrder",
    "triggers": [http("POST", "/orders")],
})

async def handler(
    request: ApiRequest[CreateOrderRequest],
    ctx: FlowContext,
) -> ApiResponse[OrderResponse]:
    # Type-safe access to request.body
    description = request.body.description if request.body else ""
    
    return ApiResponse(
        status=201,
        body=OrderResponse(id="order-123", status="pending"),
    )

Build docs developers (and LLMs) love