Trigger factory functions create typed configuration objects that define when and how a step should be executed. Each trigger type has specific parameters and behavior.
http()
Create an HTTP API trigger for handling HTTP requests.
from motia import http
trigger = http("POST", "/api/orders")
Signature
def http(
method: ApiRouteMethod,
path: str,
*,
body_schema: Any | None = None,
response_schema: dict[int, Any] | None = None,
query_params: list[QueryParam] | None = None,
middleware: list[ApiMiddleware] | None = None,
condition: TriggerCondition | None = None,
) -> ApiTrigger
HTTP method: "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", or "HEAD".
URL path for the endpoint. Supports path parameters like /users/:id.
Pydantic model or schema for request body validation.
Response schemas mapped by status code for documentation and validation.
List of query parameter definitions with name and description.
Middleware functions to execute before the handler.
Function that determines if the trigger should fire: (input, ctx) -> bool.
Example
from motia import http, step, ApiRequest, ApiResponse, FlowContext
from pydantic import BaseModel
class CreateOrderBody(BaseModel):
description: str
amount: float
class OrderResponse(BaseModel):
id: str
status: str
def is_verified_user(request: ApiRequest, ctx: FlowContext) -> bool:
"""Only process requests from verified users."""
headers = request.headers
return headers.get("x-user-verified") == "true"
my_step = step({
"name": "CreateOrder",
"triggers": [
http(
"POST",
"/api/orders",
body_schema=CreateOrderBody,
response_schema={
201: OrderResponse,
400: dict,
},
condition=is_verified_user,
),
],
})
api()
api() is deprecated. Use http() instead.
Alias for http() maintained for backwards compatibility. Has identical signature and behavior.
queue()
Create a queue trigger for processing messages from a topic.
from motia import queue
trigger = queue("order.created")
Signature
def queue(
topic: str,
*,
input: Any | None = None,
infrastructure: InfrastructureConfig | None = None,
condition: TriggerCondition | None = None,
) -> QueueTrigger
Queue topic name to subscribe to.
Pydantic model or schema for message validation.
Override default infrastructure configuration for this trigger.
Function that determines if the trigger should fire: (input, ctx) -> bool.
Example
from motia import queue, step, FlowContext
from pydantic import BaseModel
class OrderData(BaseModel):
order_id: str
amount: float
def is_high_value(data: OrderData, ctx: FlowContext) -> bool:
"""Only process high-value orders."""
return data.amount > 1000
my_step = step({
"name": "ProcessHighValueOrder",
"triggers": [
queue(
"order.created",
input=OrderData,
condition=is_high_value,
infrastructure={
"handler": {"ram": 512, "timeout": 60},
"queue": {"max_retries": 5},
},
),
],
})
async def handler(data: OrderData, ctx: FlowContext) -> None:
ctx.logger.info(f"Processing order: {data.order_id}")
# Process the high-value order
cron()
Create a cron trigger for scheduled execution.
from motia import cron
trigger = cron("0 0 * * *") # Daily at midnight
Signature
def cron(
expression: str,
*,
condition: TriggerCondition | None = None,
) -> CronTrigger
Cron expression defining the schedule. Supports standard 5-field or extended 6-field format with seconds.
Function that determines if the trigger should fire: (input, ctx) -> bool.
# 6-field format (with seconds):
┌───────────── second (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── hour (0-23)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │ ┌───────────── day of week (0-6)
│ │ │ │ │ │
* * * * * *
# 5-field format (without seconds):
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6)
│ │ │ │ │
* * * * *
Example
from motia import cron, step, FlowContext
import datetime
def only_weekdays(input: None, ctx: FlowContext) -> bool:
"""Only run on weekdays."""
return datetime.datetime.now().weekday() < 5
my_step = step({
"name": "DailySummary",
"triggers": [
cron("0 9 * * *", condition=only_weekdays), # 9 AM daily, weekdays only
],
})
async def handler(input: None, ctx: FlowContext) -> None:
ctx.logger.info("Running daily summary")
# Generate and send summary
state()
Create a state trigger that fires when state changes occur.
from motia import state
trigger = state()
Signature
def state(
*,
condition: TriggerCondition | None = None,
) -> StateTrigger
Function that determines if the trigger should fire: (input, ctx) -> bool. Typically used to filter by scope/key.
Example
from motia import state, step, FlowContext, StateTriggerInput
def only_user_changes(input: StateTriggerInput, ctx: FlowContext) -> bool:
"""Only trigger for user state changes."""
return input.group_id == "users"
my_step = step({
"name": "OnUserStateChange",
"triggers": [
state(condition=only_user_changes),
],
})
async def handler(input: StateTriggerInput, ctx: FlowContext) -> None:
ctx.logger.info(
f"User {input.item_id} changed from {input.old_value} to {input.new_value}"
)
# React to state change
Always "state" for state triggers.
The state scope/group that changed.
The key within the scope that changed.
The previous value before the change.
The new value after the change.
stream()
Create a stream trigger that fires on stream events (create, update, delete).
from motia import stream
trigger = stream("todo")
Signature
def stream(
stream_name: str,
*,
group_id: str | None = None,
item_id: str | None = None,
condition: TriggerCondition | None = None,
) -> StreamTrigger
Name of the stream to subscribe to.
Filter to only receive events for a specific group.
Filter to only receive events for a specific item within a group.
Function that determines if the trigger should fire: (input, ctx) -> bool.
Example
from motia import stream, step, FlowContext, StreamTriggerInput
def only_creates(input: StreamTriggerInput, ctx: FlowContext) -> bool:
"""Only trigger on create events."""
return input.event.type == "create"
my_step = step({
"name": "OnTodoCreated",
"triggers": [
stream(
"todo",
group_id="user-123", # Only for this user
condition=only_creates,
),
],
})
async def handler(input: StreamTriggerInput, ctx: FlowContext) -> None:
ctx.logger.info(
f"New todo created: {input.id} in group {input.group_id}"
)
ctx.logger.info(f"Data: {input.event.data}")
# Process new todo
Always "stream" for stream triggers.
Name of the stream that fired the event.
The group ID where the event occurred.
The item ID that changed.
Unix timestamp (milliseconds) when the event occurred.
Event details with type ("create", "update", or "delete") and data.
Trigger conditions
All trigger types support an optional condition parameter that filters when the trigger fires.
Signature
TriggerCondition = Callable[[Any, FlowContext], bool | Awaitable[bool]]
Condition functions receive the input data and flow context, and return a boolean (or async boolean).
Example with multiple conditions
from motia import http, queue, step, FlowContext, ApiRequest
def is_admin(request: ApiRequest, ctx: FlowContext) -> bool:
"""Check if user has admin role."""
headers = request.headers
return headers.get("x-user-role") == "admin"
def is_high_priority(data: dict, ctx: FlowContext) -> bool:
"""Check if queue message is high priority."""
return data.get("priority") == "high"
my_step = step({
"name": "AdminActions",
"triggers": [
http("POST", "/admin/actions", condition=is_admin),
queue("admin.actions", condition=is_high_priority),
],
})
TriggerInfo
The ctx.trigger property provides information about which trigger fired:
type
'http' | 'queue' | 'cron' | 'state' | 'stream'
The type of trigger that fired.
Index of the trigger in the triggers list.
For HTTP triggers, the matched path.
For HTTP triggers, the HTTP method.
For queue triggers, the topic name.
For cron triggers, the cron expression.
Example
async def handler(input, ctx: FlowContext) -> None:
if ctx.trigger.type == "http":
ctx.logger.info(f"HTTP {ctx.trigger.method} {ctx.trigger.path}")
elif ctx.trigger.type == "queue":
ctx.logger.info(f"Queue message from {ctx.trigger.topic}")