Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Triggers define when a step executes. Motia supports HTTP/API triggers, queue triggers, cron schedules, state change triggers, and stream event triggers.
HTTP Triggers
http
from motia.triggers import http
trigger = http(method, path, **options)
Create an HTTP/API trigger configuration.
HTTP method: "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", or "HEAD".
URL path pattern. Can include path parameters like /users/:id.
Pydantic model or JSON schema for request body validation.
Response schemas keyed by status code. Values can be Pydantic models or JSON schemas.
List of query parameter definitions.
List of middleware functions to process requests.
condition
Callable[[ApiRequest, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter requests.
HTTP trigger configuration.
Example
from motia.triggers import http
from pydantic import BaseModel
class UserInput(BaseModel):
name: str
email: str
class UserOutput(BaseModel):
id: str
name: str
trigger = http(
"POST",
"/api/users",
body_schema=UserInput,
response_schema={
201: UserOutput,
400: {"type": "object", "properties": {"error": {"type": "string"}}}
},
middleware=[auth_middleware, logging_middleware]
)
api (Deprecated)
from motia.triggers import api
api() is deprecated. Use http() instead.
Queue Triggers
queue
from motia.triggers import queue
trigger = queue(topic, **options)
Create a queue trigger configuration.
Queue topic name to subscribe to.
Pydantic model or JSON schema for input validation.
Infrastructure configuration for queue and handler resources.
condition
Callable[[Any, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter messages.
Queue trigger configuration.
Example
from motia.triggers import queue
from motia.types import InfrastructureConfig, QueueConfig
from pydantic import BaseModel
class OrderEvent(BaseModel):
order_id: str
status: str
trigger = queue(
"order-events",
input=OrderEvent,
infrastructure=InfrastructureConfig(
queue=QueueConfig(
type="fifo",
max_retries=5,
visibility_timeout=60
)
),
condition=lambda data, ctx: data.status == "completed"
)
Cron Triggers
cron
from motia.triggers import cron
trigger = cron(expression, **options)
Create a cron schedule trigger.
Cron expression in standard format: minute hour day month weekday.Examples:
"0 0 * * *" - Daily at midnight
"*/15 * * * *" - Every 15 minutes
"0 9 * * 1-5" - Weekdays at 9 AM
condition
Callable[[None, FlowContext], bool | Awaitable[bool]]
Optional condition function to conditionally skip executions.
Cron trigger configuration.
Example
from motia.triggers import cron
# Run daily at 2 AM
daily_trigger = cron("0 2 * * *")
# Run every hour on weekdays
weekday_trigger = cron("0 * * * 1-5")
# Run every 30 minutes
frequent_trigger = cron("*/30 * * * *")
State Triggers
state
from motia.triggers import state
trigger = state(**options)
Create a state change trigger that fires when state is modified.
condition
Callable[[StateTriggerInput, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter state changes.
State trigger configuration.
Example
from motia.triggers import state
# Trigger on any state change
any_change = state()
# Trigger only when specific conditions are met
conditional = state(
condition=lambda input_data, ctx: (
input_data.group_id == "users" and
input_data.new_value.get("status") == "active"
)
)
Input received when a state trigger fires.
The state scope that changed.
The item key that changed.
Previous value before the change.
New value after the change.
Stream Triggers
stream
from motia.triggers import stream
trigger = stream(stream_name, **options)
Create a stream event trigger that fires on stream operations.
Name of the stream to subscribe to.
Optional group ID to filter events.
Optional item ID to filter events.
condition
Callable[[StreamTriggerInput, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter events.
Stream trigger configuration.
Example
from motia.triggers import stream
# Subscribe to all events in a stream
all_events = stream("chat")
# Subscribe to events in a specific group
room_events = stream("chat", group_id="room-123")
# Subscribe to events for a specific item
user_events = stream("chat", group_id="room-123", item_id="user-456")
# Conditional stream trigger
create_only = stream(
"chat",
condition=lambda input_data, ctx: input_data.event.type == "create"
)
Input received when a stream trigger fires.
Group identifier where the event occurred.
Event timestamp in milliseconds.
Event details with type (“create”, “update”, “delete”) and data.
Request Types
ApiRequest[TBody]
API request object for HTTP triggers.
Path parameters extracted from the URL.
query_params
dict[str, str | list[str]]
Query string parameters.
Request body, validated against body_schema if provided.
MotiaHttpArgs[TBody]
Full HTTP request/response object for streaming responses.
Request details including body, headers, path/query params, and method.
Response writer for streaming responses.
MotiaHttpResponse Methods
await response.status(200)
await response.headers({"Content-Type": "application/json"})
response.writer.stream.write(b"data")
response.close()
Response Types
ApiResponse[TOutput]
API response object returned from HTTP handlers.
Example
return {
"status": 201,
"body": {"id": "user-123", "name": "Alice"},
"headers": {"X-Request-ID": ctx.trace_id}
}
Middleware
ApiMiddleware
Middleware function signature for HTTP triggers.
from motia.types import ApiRequest, ApiResponse, FlowContext
async def my_middleware(
req: ApiRequest,
ctx: FlowContext,
next: Callable[[], Awaitable[ApiResponse]]
) -> ApiResponse:
# Pre-processing
ctx.logger.info(f"Request to {ctx.trigger.path}")
# Call next middleware/handler
response = await next()
# Post-processing
response.headers["X-Powered-By"] = "Motia"
return response
Usage Examples
Multi-Trigger Step
from motia import step, StepConfig
from motia.triggers import http, queue, cron
my_step = step(
StepConfig(
name="multi-trigger-step",
triggers=[
http("GET", "/health"),
queue("events"),
cron("0 * * * *")
]
),
handler=async def handle(input_data, ctx):
return await ctx.match({
"http": lambda req: {"status": 200, "body": {"ok": True}},
"queue": lambda data: process_event(data),
"cron": lambda: cleanup_task(),
})
)
Conditional Triggers
from motia.triggers import http, queue
# Only process premium users
queue_trigger = queue(
"user-events",
condition=lambda data, ctx: data.get("tier") == "premium"
)
# Only handle POST requests with specific header
api_trigger = http(
"POST",
"/webhook",
condition=lambda req, ctx: req.headers.get("X-API-Key") == "secret"
)
Stream with Filters
from motia.triggers import stream
# React to new chat messages only
chat_trigger = stream(
"chat",
group_id="room-123",
condition=lambda input_data, ctx: input_data.event.type == "create"
)
- Step - Creating step definitions
- Context - Flow context for handlers
- Motia - Runtime manager