Skip to main content
Trigger factory functions create typed configuration objects that define when and how a step should be executed. Each trigger type has specific parameters and behavior.

http()

Create an HTTP API trigger for handling HTTP requests.
from motia import http

trigger = http("POST", "/api/orders")

Signature

def http(
    method: ApiRouteMethod,
    path: str,
    *,
    body_schema: Any | None = None,
    response_schema: dict[int, Any] | None = None,
    query_params: list[QueryParam] | None = None,
    middleware: list[ApiMiddleware] | None = None,
    condition: TriggerCondition | None = None,
) -> ApiTrigger
method
ApiRouteMethod
required
HTTP method: "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", or "HEAD".
path
string
required
URL path for the endpoint. Supports path parameters like /users/:id.
body_schema
Any
Pydantic model or schema for request body validation.
response_schema
dict[int, Any]
Response schemas mapped by status code for documentation and validation.
query_params
list[QueryParam]
List of query parameter definitions with name and description.
middleware
list[ApiMiddleware]
Middleware functions to execute before the handler.
condition
TriggerCondition
Function that determines if the trigger should fire: (input, ctx) -> bool.

Example

from motia import http, step, ApiRequest, ApiResponse, FlowContext
from pydantic import BaseModel

class CreateOrderBody(BaseModel):
    description: str
    amount: float

class OrderResponse(BaseModel):
    id: str
    status: str

def is_verified_user(request: ApiRequest, ctx: FlowContext) -> bool:
    """Only process requests from verified users."""
    headers = request.headers
    return headers.get("x-user-verified") == "true"

my_step = step({
    "name": "CreateOrder",
    "triggers": [
        http(
            "POST",
            "/api/orders",
            body_schema=CreateOrderBody,
            response_schema={
                201: OrderResponse,
                400: dict,
            },
            condition=is_verified_user,
        ),
    ],
})

api()

api() is deprecated. Use http() instead.
Alias for http() maintained for backwards compatibility. Has identical signature and behavior.

queue()

Create a queue trigger for processing messages from a topic.
from motia import queue

trigger = queue("order.created")

Signature

def queue(
    topic: str,
    *,
    input: Any | None = None,
    infrastructure: InfrastructureConfig | None = None,
    condition: TriggerCondition | None = None,
) -> QueueTrigger
topic
string
required
Queue topic name to subscribe to.
input
Any
Pydantic model or schema for message validation.
infrastructure
InfrastructureConfig
Override default infrastructure configuration for this trigger.
condition
TriggerCondition
Function that determines if the trigger should fire: (input, ctx) -> bool.

Example

from motia import queue, step, FlowContext
from pydantic import BaseModel

class OrderData(BaseModel):
    order_id: str
    amount: float

def is_high_value(data: OrderData, ctx: FlowContext) -> bool:
    """Only process high-value orders."""
    return data.amount > 1000

my_step = step({
    "name": "ProcessHighValueOrder",
    "triggers": [
        queue(
            "order.created",
            input=OrderData,
            condition=is_high_value,
            infrastructure={
                "handler": {"ram": 512, "timeout": 60},
                "queue": {"max_retries": 5},
            },
        ),
    ],
})

async def handler(data: OrderData, ctx: FlowContext) -> None:
    ctx.logger.info(f"Processing order: {data.order_id}")
    # Process the high-value order

cron()

Create a cron trigger for scheduled execution.
from motia import cron

trigger = cron("0 0 * * *")  # Daily at midnight

Signature

def cron(
    expression: str,
    *,
    condition: TriggerCondition | None = None,
) -> CronTrigger
expression
string
required
Cron expression defining the schedule. Supports standard 5-field or extended 6-field format with seconds.
condition
TriggerCondition
Function that determines if the trigger should fire: (input, ctx) -> bool.

Cron expression format

# 6-field format (with seconds):
┌───────────── second (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── hour (0-23)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │ ┌───────────── day of week (0-6)
│ │ │ │ │ │
* * * * * *

# 5-field format (without seconds):
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6)
│ │ │ │ │
* * * * *

Example

from motia import cron, step, FlowContext
import datetime

def only_weekdays(input: None, ctx: FlowContext) -> bool:
    """Only run on weekdays."""
    return datetime.datetime.now().weekday() < 5

my_step = step({
    "name": "DailySummary",
    "triggers": [
        cron("0 9 * * *", condition=only_weekdays),  # 9 AM daily, weekdays only
    ],
})

async def handler(input: None, ctx: FlowContext) -> None:
    ctx.logger.info("Running daily summary")
    # Generate and send summary

state()

Create a state trigger that fires when state changes occur.
from motia import state

trigger = state()

Signature

def state(
    *,
    condition: TriggerCondition | None = None,
) -> StateTrigger
condition
TriggerCondition
Function that determines if the trigger should fire: (input, ctx) -> bool. Typically used to filter by scope/key.

Example

from motia import state, step, FlowContext, StateTriggerInput

def only_user_changes(input: StateTriggerInput, ctx: FlowContext) -> bool:
    """Only trigger for user state changes."""
    return input.group_id == "users"

my_step = step({
    "name": "OnUserStateChange",
    "triggers": [
        state(condition=only_user_changes),
    ],
})

async def handler(input: StateTriggerInput, ctx: FlowContext) -> None:
    ctx.logger.info(
        f"User {input.item_id} changed from {input.old_value} to {input.new_value}"
    )
    # React to state change

StateTriggerInput properties

type
'state'
Always "state" for state triggers.
group_id
string
The state scope/group that changed.
item_id
string
The key within the scope that changed.
old_value
Any
The previous value before the change.
new_value
Any
The new value after the change.

stream()

Create a stream trigger that fires on stream events (create, update, delete).
from motia import stream

trigger = stream("todo")

Signature

def stream(
    stream_name: str,
    *,
    group_id: str | None = None,
    item_id: str | None = None,
    condition: TriggerCondition | None = None,
) -> StreamTrigger
stream_name
string
required
Name of the stream to subscribe to.
group_id
string
Filter to only receive events for a specific group.
item_id
string
Filter to only receive events for a specific item within a group.
condition
TriggerCondition
Function that determines if the trigger should fire: (input, ctx) -> bool.

Example

from motia import stream, step, FlowContext, StreamTriggerInput

def only_creates(input: StreamTriggerInput, ctx: FlowContext) -> bool:
    """Only trigger on create events."""
    return input.event.type == "create"

my_step = step({
    "name": "OnTodoCreated",
    "triggers": [
        stream(
            "todo",
            group_id="user-123",  # Only for this user
            condition=only_creates,
        ),
    ],
})

async def handler(input: StreamTriggerInput, ctx: FlowContext) -> None:
    ctx.logger.info(
        f"New todo created: {input.id} in group {input.group_id}"
    )
    ctx.logger.info(f"Data: {input.event.data}")
    # Process new todo

StreamTriggerInput properties

type
'stream'
Always "stream" for stream triggers.
stream_name
string
Name of the stream that fired the event.
group_id
string
The group ID where the event occurred.
id
string
The item ID that changed.
timestamp
int
Unix timestamp (milliseconds) when the event occurred.
event
StreamEvent
Event details with type ("create", "update", or "delete") and data.

Trigger conditions

All trigger types support an optional condition parameter that filters when the trigger fires.

Signature

TriggerCondition = Callable[[Any, FlowContext], bool | Awaitable[bool]]
Condition functions receive the input data and flow context, and return a boolean (or async boolean).

Example with multiple conditions

from motia import http, queue, step, FlowContext, ApiRequest

def is_admin(request: ApiRequest, ctx: FlowContext) -> bool:
    """Check if user has admin role."""
    headers = request.headers
    return headers.get("x-user-role") == "admin"

def is_high_priority(data: dict, ctx: FlowContext) -> bool:
    """Check if queue message is high priority."""
    return data.get("priority") == "high"

my_step = step({
    "name": "AdminActions",
    "triggers": [
        http("POST", "/admin/actions", condition=is_admin),
        queue("admin.actions", condition=is_high_priority),
    ],
})

TriggerInfo

The ctx.trigger property provides information about which trigger fired:
type
'http' | 'queue' | 'cron' | 'state' | 'stream'
The type of trigger that fired.
index
int
Index of the trigger in the triggers list.
path
string
For HTTP triggers, the matched path.
method
string
For HTTP triggers, the HTTP method.
topic
string
For queue triggers, the topic name.
expression
string
For cron triggers, the cron expression.

Example

async def handler(input, ctx: FlowContext) -> None:
    if ctx.trigger.type == "http":
        ctx.logger.info(f"HTTP {ctx.trigger.method} {ctx.trigger.path}")
    elif ctx.trigger.type == "queue":
        ctx.logger.info(f"Queue message from {ctx.trigger.topic}")

Build docs developers (and LLMs) love