The step() function is the primary way to define executable workflow steps in Motia. A step combines configuration (triggers, enqueues, infrastructure) with a handler function that executes when the step is triggered.
Basic usage
from motia import step, http, FlowContext, ApiRequest, ApiResponse
my_step = step(
config={
"name": "CreateOrder",
"triggers": [http("POST", "/orders")],
"enqueues": ["order.created"],
},
handler=handler_func
)
Function signature
def step(
config: StepConfig | dict[str, Any],
handler: StepHandler | None = None,
) -> StepDefinition | StepBuilder
config
StepConfig | dict
required
Step configuration defining name, triggers, and behavior. Can be a StepConfig instance or a dictionary.
Optional async handler function. If provided, returns a complete StepDefinition. If omitted, returns a StepBuilder that requires calling .handle() later.
return
StepDefinition | StepBuilder
Returns StepDefinition if handler is provided, otherwise returns StepBuilder.
StepConfig properties
Unique identifier for the step within the flow.
triggers
list[TriggerConfig]
required
List of trigger configurations that can invoke this step. See Triggers.
enqueues
list[str | Enqueue]
default:"[]"
Topics that this step can enqueue messages to. Can be topic strings or Enqueue objects with additional configuration.
Human-readable description of what the step does.
Flow names this step belongs to for organizational purposes.
Infrastructure configuration for handler execution (RAM, CPU, timeout) and queue settings.
Virtual enqueue topics used for flow dependency analysis without actual queue creation.
Virtual subscriptions used for flow dependency analysis without actual topic consumption.
Additional files to include when deploying this step.
Builder pattern
Use the builder pattern to separate configuration from handler definition:
from motia import step, http, FlowContext, ApiRequest, ApiResponse
# Create builder
order_step = step({
"name": "CreateOrder",
"triggers": [http("POST", "/orders")],
"enqueues": ["order.created"],
})
# Attach handler later
@order_step.handle
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
return ApiResponse(status=201, body={"id": "order-123"})
StepDefinition type
The complete step definition returned when a handler is provided:
The step configuration object.
The async handler function that executes when the step is triggered.
Handler function signature
Handler functions must be async and accept input based on the trigger type:
# HTTP trigger handler
async def handler(
request: ApiRequest,
ctx: FlowContext
) -> ApiResponse:
pass
# Queue trigger handler
async def handler(
data: Any,
ctx: FlowContext
) -> None:
pass
# Cron trigger handler
async def handler(
input: None,
ctx: FlowContext
) -> None:
pass
# State trigger handler
async def handler(
input: StateTriggerInput,
ctx: FlowContext
) -> Any:
pass
# Stream trigger handler
async def handler(
input: StreamTriggerInput,
ctx: FlowContext
) -> Any:
pass
Infrastructure configuration
Configure compute resources and queue behavior:
from motia import step, queue
my_step = step({
"name": "ProcessOrder",
"triggers": [queue("orders")],
"infrastructure": {
"handler": {
"ram": 512, # MB of memory
"cpu": 256, # CPU units
"timeout": 60, # seconds
},
"queue": {
"type": "fifo",
"max_retries": 5,
"visibility_timeout": 30,
"delay_seconds": 0,
},
},
})
HandlerConfig properties
CPU units allocated to the handler.
Maximum execution time in seconds.
QueueConfig properties
type
'fifo' | 'standard'
default:"'standard'"
Queue type for message ordering guarantees.
Maximum number of retry attempts for failed messages.
Time in seconds that a message is hidden after being received.
Delay in seconds before a message becomes available.
Enqueue configuration
Define enqueue targets with additional options:
from motia import step, Enqueue, http
my_step = step({
"name": "CreateOrder",
"triggers": [http("POST", "/orders")],
"enqueues": [
"order.created", # Simple string topic
Enqueue(
topic="high-value.order",
label="High Value Orders",
conditional=True, # Only enqueue if explicitly called
),
],
})
The queue topic name to enqueue to.
Human-readable label for the enqueue target.
If true, this enqueue target is only used when explicitly called, not automatically.
Complete example
from motia import (
step,
http,
queue,
FlowContext,
ApiRequest,
ApiResponse,
Stream,
)
order_stream = Stream("orders")
config = {
"name": "CreateOrder",
"description": "Create a new order and enqueue for processing",
"flows": ["orders"],
"triggers": [
http("POST", "/orders"),
queue("order.retry"),
],
"enqueues": ["order.created", "order.failed"],
"infrastructure": {
"handler": {
"ram": 256,
"timeout": 45,
},
},
}
async def handler(input_data, ctx: FlowContext) -> Any:
"""Handle order creation from API or queue."""
async def handle_api(request: ApiRequest) -> ApiResponse:
body = request.body or {}
order_id = f"order-{ctx.trace_id[:8]}"
# Save to stream
await order_stream.set("pending", order_id, {
"id": order_id,
"description": body.get("description"),
"amount": body.get("amount", 0),
})
# Enqueue for processing
await ctx.enqueue({
"topic": "order.created",
"data": {"order_id": order_id},
})
return ApiResponse(status=201, body={"id": order_id})
async def handle_queue(data) -> None:
ctx.logger.info(f"Retrying order: {data}")
await ctx.enqueue({
"topic": "order.created",
"data": data,
})
return await ctx.match({
"http": handle_api,
"queue": handle_queue,
})
my_step = step(config, handler)