Skip to main content
The step() function is the primary way to define executable workflow steps in Motia. A step combines configuration (triggers, enqueues, infrastructure) with a handler function that executes when the step is triggered.

Basic usage

from motia import step, http, FlowContext, ApiRequest, ApiResponse

my_step = step(
    config={
        "name": "CreateOrder",
        "triggers": [http("POST", "/orders")],
        "enqueues": ["order.created"],
    },
    handler=handler_func
)

Function signature

def step(
    config: StepConfig | dict[str, Any],
    handler: StepHandler | None = None,
) -> StepDefinition | StepBuilder
config
StepConfig | dict
required
Step configuration defining name, triggers, and behavior. Can be a StepConfig instance or a dictionary.
handler
async function
Optional async handler function. If provided, returns a complete StepDefinition. If omitted, returns a StepBuilder that requires calling .handle() later.
return
StepDefinition | StepBuilder
Returns StepDefinition if handler is provided, otherwise returns StepBuilder.

StepConfig properties

name
string
required
Unique identifier for the step within the flow.
triggers
list[TriggerConfig]
required
List of trigger configurations that can invoke this step. See Triggers.
enqueues
list[str | Enqueue]
default:"[]"
Topics that this step can enqueue messages to. Can be topic strings or Enqueue objects with additional configuration.
description
string
Human-readable description of what the step does.
flows
list[string]
Flow names this step belongs to for organizational purposes.
infrastructure
InfrastructureConfig
Infrastructure configuration for handler execution (RAM, CPU, timeout) and queue settings.
virtual_enqueues
list[str | Enqueue]
Virtual enqueue topics used for flow dependency analysis without actual queue creation.
virtual_subscribes
list[string]
Virtual subscriptions used for flow dependency analysis without actual topic consumption.
include_files
list[string]
Additional files to include when deploying this step.

Builder pattern

Use the builder pattern to separate configuration from handler definition:
from motia import step, http, FlowContext, ApiRequest, ApiResponse

# Create builder
order_step = step({
    "name": "CreateOrder",
    "triggers": [http("POST", "/orders")],
    "enqueues": ["order.created"],
})

# Attach handler later
@order_step.handle
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    return ApiResponse(status=201, body={"id": "order-123"})

StepDefinition type

The complete step definition returned when a handler is provided:
config
StepConfig
The step configuration object.
handler
StepHandler
The async handler function that executes when the step is triggered.

Handler function signature

Handler functions must be async and accept input based on the trigger type:
# HTTP trigger handler
async def handler(
    request: ApiRequest,
    ctx: FlowContext
) -> ApiResponse:
    pass

# Queue trigger handler
async def handler(
    data: Any,
    ctx: FlowContext
) -> None:
    pass

# Cron trigger handler
async def handler(
    input: None,
    ctx: FlowContext
) -> None:
    pass

# State trigger handler
async def handler(
    input: StateTriggerInput,
    ctx: FlowContext
) -> Any:
    pass

# Stream trigger handler
async def handler(
    input: StreamTriggerInput,
    ctx: FlowContext
) -> Any:
    pass

Infrastructure configuration

Configure compute resources and queue behavior:
from motia import step, queue

my_step = step({
    "name": "ProcessOrder",
    "triggers": [queue("orders")],
    "infrastructure": {
        "handler": {
            "ram": 512,      # MB of memory
            "cpu": 256,      # CPU units
            "timeout": 60,   # seconds
        },
        "queue": {
            "type": "fifo",
            "max_retries": 5,
            "visibility_timeout": 30,
            "delay_seconds": 0,
        },
    },
})

HandlerConfig properties

ram
int
default:"128"
Memory allocation in MB.
cpu
int
CPU units allocated to the handler.
timeout
int
default:"30"
Maximum execution time in seconds.

QueueConfig properties

type
'fifo' | 'standard'
default:"'standard'"
Queue type for message ordering guarantees.
max_retries
int
default:"3"
Maximum number of retry attempts for failed messages.
visibility_timeout
int
default:"30"
Time in seconds that a message is hidden after being received.
delay_seconds
int
default:"0"
Delay in seconds before a message becomes available.

Enqueue configuration

Define enqueue targets with additional options:
from motia import step, Enqueue, http

my_step = step({
    "name": "CreateOrder",
    "triggers": [http("POST", "/orders")],
    "enqueues": [
        "order.created",  # Simple string topic
        Enqueue(
            topic="high-value.order",
            label="High Value Orders",
            conditional=True,  # Only enqueue if explicitly called
        ),
    ],
})
topic
string
required
The queue topic name to enqueue to.
label
string
Human-readable label for the enqueue target.
conditional
bool
default:"false"
If true, this enqueue target is only used when explicitly called, not automatically.

Complete example

from motia import (
    step,
    http,
    queue,
    FlowContext,
    ApiRequest,
    ApiResponse,
    Stream,
)

order_stream = Stream("orders")

config = {
    "name": "CreateOrder",
    "description": "Create a new order and enqueue for processing",
    "flows": ["orders"],
    "triggers": [
        http("POST", "/orders"),
        queue("order.retry"),
    ],
    "enqueues": ["order.created", "order.failed"],
    "infrastructure": {
        "handler": {
            "ram": 256,
            "timeout": 45,
        },
    },
}

async def handler(input_data, ctx: FlowContext) -> Any:
    """Handle order creation from API or queue."""
    
    async def handle_api(request: ApiRequest) -> ApiResponse:
        body = request.body or {}
        order_id = f"order-{ctx.trace_id[:8]}"
        
        # Save to stream
        await order_stream.set("pending", order_id, {
            "id": order_id,
            "description": body.get("description"),
            "amount": body.get("amount", 0),
        })
        
        # Enqueue for processing
        await ctx.enqueue({
            "topic": "order.created",
            "data": {"order_id": order_id},
        })
        
        return ApiResponse(status=201, body={"id": order_id})
    
    async def handle_queue(data) -> None:
        ctx.logger.info(f"Retrying order: {data}")
        await ctx.enqueue({
            "topic": "order.created",
            "data": data,
        })
    
    return await ctx.match({
        "http": handle_api,
        "queue": handle_queue,
    })

my_step = step(config, handler)

Build docs developers (and LLMs) love