Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Triggers define when a step executes. Motia supports HTTP/API triggers, queue triggers, cron schedules, state change triggers, and stream event triggers.

HTTP Triggers

http

from motia.triggers import http

trigger = http(method, path, **options)
Create an HTTP/API trigger configuration.
method
ApiRouteMethod
required
HTTP method: "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", or "HEAD".
path
str
required
URL path pattern. Can include path parameters like /users/:id.
body_schema
Any
Pydantic model or JSON schema for request body validation.
response_schema
dict[int, Any]
Response schemas keyed by status code. Values can be Pydantic models or JSON schemas.
query_params
list[QueryParam]
List of query parameter definitions.
middleware
list[ApiMiddleware]
List of middleware functions to process requests.
condition
Callable[[ApiRequest, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter requests.
return
ApiTrigger
HTTP trigger configuration.

Example

from motia.triggers import http
from pydantic import BaseModel

class UserInput(BaseModel):
    name: str
    email: str

class UserOutput(BaseModel):
    id: str
    name: str

trigger = http(
    "POST",
    "/api/users",
    body_schema=UserInput,
    response_schema={
        201: UserOutput,
        400: {"type": "object", "properties": {"error": {"type": "string"}}}
    },
    middleware=[auth_middleware, logging_middleware]
)

api (Deprecated)

from motia.triggers import api
api() is deprecated. Use http() instead.

Queue Triggers

queue

from motia.triggers import queue

trigger = queue(topic, **options)
Create a queue trigger configuration.
topic
str
required
Queue topic name to subscribe to.
input
Any
Pydantic model or JSON schema for input validation.
infrastructure
InfrastructureConfig
Infrastructure configuration for queue and handler resources.
condition
Callable[[Any, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter messages.
return
QueueTrigger
Queue trigger configuration.

Example

from motia.triggers import queue
from motia.types import InfrastructureConfig, QueueConfig
from pydantic import BaseModel

class OrderEvent(BaseModel):
    order_id: str
    status: str

trigger = queue(
    "order-events",
    input=OrderEvent,
    infrastructure=InfrastructureConfig(
        queue=QueueConfig(
            type="fifo",
            max_retries=5,
            visibility_timeout=60
        )
    ),
    condition=lambda data, ctx: data.status == "completed"
)

Cron Triggers

cron

from motia.triggers import cron

trigger = cron(expression, **options)
Create a cron schedule trigger.
expression
str
required
Cron expression in standard format: minute hour day month weekday.Examples:
  • "0 0 * * *" - Daily at midnight
  • "*/15 * * * *" - Every 15 minutes
  • "0 9 * * 1-5" - Weekdays at 9 AM
condition
Callable[[None, FlowContext], bool | Awaitable[bool]]
Optional condition function to conditionally skip executions.
return
CronTrigger
Cron trigger configuration.

Example

from motia.triggers import cron

# Run daily at 2 AM
daily_trigger = cron("0 2 * * *")

# Run every hour on weekdays
weekday_trigger = cron("0 * * * 1-5")

# Run every 30 minutes
frequent_trigger = cron("*/30 * * * *")

State Triggers

state

from motia.triggers import state

trigger = state(**options)
Create a state change trigger that fires when state is modified.
condition
Callable[[StateTriggerInput, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter state changes.
return
StateTrigger
State trigger configuration.

Example

from motia.triggers import state

# Trigger on any state change
any_change = state()

# Trigger only when specific conditions are met
conditional = state(
    condition=lambda input_data, ctx: (
        input_data.group_id == "users" and 
        input_data.new_value.get("status") == "active"
    )
)

StateTriggerInput

Input received when a state trigger fires.
type
'state'
Always "state".
group_id
str
The state scope that changed.
item_id
str
The item key that changed.
old_value
Any | None
Previous value before the change.
new_value
Any | None
New value after the change.

Stream Triggers

stream

from motia.triggers import stream

trigger = stream(stream_name, **options)
Create a stream event trigger that fires on stream operations.
stream_name
str
required
Name of the stream to subscribe to.
group_id
str
Optional group ID to filter events.
item_id
str
Optional item ID to filter events.
condition
Callable[[StreamTriggerInput, FlowContext], bool | Awaitable[bool]]
Optional condition function to filter events.
return
StreamTrigger
Stream trigger configuration.

Example

from motia.triggers import stream

# Subscribe to all events in a stream
all_events = stream("chat")

# Subscribe to events in a specific group
room_events = stream("chat", group_id="room-123")

# Subscribe to events for a specific item
user_events = stream("chat", group_id="room-123", item_id="user-456")

# Conditional stream trigger
create_only = stream(
    "chat",
    condition=lambda input_data, ctx: input_data.event.type == "create"
)

StreamTriggerInput

Input received when a stream trigger fires.
type
'stream'
Always "stream".
stream_name
str
Name of the stream.
group_id
str
Group identifier where the event occurred.
id
str
Item identifier.
timestamp
int
Event timestamp in milliseconds.
event
StreamEvent
Event details with type (“create”, “update”, “delete”) and data.

Request Types

ApiRequest[TBody]

API request object for HTTP triggers.
path_params
dict[str, str]
Path parameters extracted from the URL.
query_params
dict[str, str | list[str]]
Query string parameters.
body
TBody | None
Request body, validated against body_schema if provided.
headers
dict[str, str | list[str]]
Request headers.

MotiaHttpArgs[TBody]

Full HTTP request/response object for streaming responses.
request
MotiaHttpRequest[TBody]
Request details including body, headers, path/query params, and method.
response
MotiaHttpResponse
Response writer for streaming responses.

MotiaHttpResponse Methods

await response.status(200)
await response.headers({"Content-Type": "application/json"})
response.writer.stream.write(b"data")
response.close()

Response Types

ApiResponse[TOutput]

API response object returned from HTTP handlers.
status
int
required
HTTP status code.
body
Any
required
Response body data.
headers
dict[str, str]
default:"{}"
Response headers.

Example

return {
    "status": 201,
    "body": {"id": "user-123", "name": "Alice"},
    "headers": {"X-Request-ID": ctx.trace_id}
}

Middleware

ApiMiddleware

Middleware function signature for HTTP triggers.
from motia.types import ApiRequest, ApiResponse, FlowContext

async def my_middleware(
    req: ApiRequest,
    ctx: FlowContext,
    next: Callable[[], Awaitable[ApiResponse]]
) -> ApiResponse:
    # Pre-processing
    ctx.logger.info(f"Request to {ctx.trigger.path}")
    
    # Call next middleware/handler
    response = await next()
    
    # Post-processing
    response.headers["X-Powered-By"] = "Motia"
    
    return response

Usage Examples

Multi-Trigger Step

from motia import step, StepConfig
from motia.triggers import http, queue, cron

my_step = step(
    StepConfig(
        name="multi-trigger-step",
        triggers=[
            http("GET", "/health"),
            queue("events"),
            cron("0 * * * *")
        ]
    ),
    handler=async def handle(input_data, ctx):
        return await ctx.match({
            "http": lambda req: {"status": 200, "body": {"ok": True}},
            "queue": lambda data: process_event(data),
            "cron": lambda: cleanup_task(),
        })
)

Conditional Triggers

from motia.triggers import http, queue

# Only process premium users
queue_trigger = queue(
    "user-events",
    condition=lambda data, ctx: data.get("tier") == "premium"
)

# Only handle POST requests with specific header
api_trigger = http(
    "POST",
    "/webhook",
    condition=lambda req, ctx: req.headers.get("X-API-Key") == "secret"
)

Stream with Filters

from motia.triggers import stream

# React to new chat messages only
chat_trigger = stream(
    "chat",
    group_id="room-123",
    condition=lambda input_data, ctx: input_data.event.type == "create"
)
  • Step - Creating step definitions
  • Context - Flow context for handlers
  • Motia - Runtime manager

Build docs developers (and LLMs) love