Skip to main content
The FlowContext object is passed as the second parameter to all step handlers. It provides access to state management, logging, event enqueueing, stream access, and trigger information.

Type signature

class FlowContext(BaseModel, Generic[TEnqueueData]):
    enqueue: Enqueuer
    trace_id: str
    state: InternalStateManager
    logger: Logger
    streams: dict[str, Stream[Any]]
    trigger: TriggerInfo
    input_value: Any

Properties

enqueue
Enqueuer
Function to enqueue messages to topics: async (data) -> None.
trace_id
string
Unique identifier for tracking this execution across steps.
state
InternalStateManager
State manager for persistent key-value storage. See StateManager.
logger
Logger
Structured logger for output. Use logger.info(), logger.error(), etc.
streams
dict[str, Stream]
Dictionary of initialized streams by name. See Streams.
trigger
TriggerInfo
Information about which trigger fired this execution.
input_value
Any
Raw input value received by the handler. Type depends on trigger type.

Methods

is_queue()

Check if the current trigger is a queue trigger.
def is_queue(self) -> bool
return
bool
True if trigger type is "queue", otherwise False.

Example

async def handler(input, ctx: FlowContext) -> None:
    if ctx.is_queue():
        ctx.logger.info("Processing queue message")

is_api()

Check if the current trigger is an HTTP API request.
def is_api(self) -> bool
return
bool
True if trigger type is "http", otherwise False.

Example

async def handler(input, ctx: FlowContext) -> Any:
    if ctx.is_api():
        return ApiResponse(status=200, body={"message": "OK"})

is_cron()

Check if the current trigger is a cron schedule.
def is_cron(self) -> bool
return
bool
True if trigger type is "cron", otherwise False.

is_state()

Check if the current trigger is a state change.
def is_state(self) -> bool
return
bool
True if trigger type is "state", otherwise False.

is_stream()

Check if the current trigger is a stream event.
def is_stream(self) -> bool
return
bool
True if trigger type is "stream", otherwise False.

get_data()

Extract the data payload from input, normalized across trigger types.
def get_data(self) -> Any
return
Any
  • For HTTP triggers: returns request.body
  • For queue triggers: returns the queue data directly
  • For cron triggers: returns None
  • For state/stream triggers: returns the input object

Example

async def handler(input, ctx: FlowContext) -> None:
    data = ctx.get_data()  # Works for any trigger type
    if data:
        ctx.logger.info(f"Processing data: {data}")

match()

Dispatch to different handlers based on trigger type. This is the recommended pattern for multi-trigger steps.
async def match(self, handlers: dict[str, Callable]) -> Any
handlers
dict[str, Callable]
required
Dictionary mapping trigger types to handler functions. Supported keys:
  • "http" or "api": Handler for HTTP triggers
  • "queue": Handler for queue triggers
  • "cron": Handler for cron triggers
  • "state": Handler for state triggers
  • "stream": Handler for stream triggers
  • "default": Fallback handler if no type matches
return
Any
Return value from the matched handler.

Handler signatures

# HTTP handler
async def http_handler(request: ApiRequest) -> ApiResponse:
    pass

# Queue handler
async def queue_handler(data: Any) -> None:
    pass

# Cron handler (no input)
async def cron_handler() -> None:
    pass

# State handler
async def state_handler(input: StateTriggerInput) -> Any:
    pass

# Stream handler
async def stream_handler(input: StreamTriggerInput) -> Any:
    pass

# Default fallback
async def default_handler(input: Any) -> Any:
    pass

Example

from motia import step, http, queue, FlowContext, ApiRequest, ApiResponse

my_step = step({
    "name": "ProcessOrder",
    "triggers": [
        http("POST", "/orders"),
        queue("order.retry"),
    ],
})

async def handler(input, ctx: FlowContext) -> Any:
    async def handle_api(request: ApiRequest) -> ApiResponse:
        ctx.logger.info(f"API request to {ctx.trigger.path}")
        order_id = request.body.get("id")
        return ApiResponse(status=201, body={"id": order_id})
    
    async def handle_queue(data: dict) -> None:
        ctx.logger.info(f"Queue message from {ctx.trigger.topic}")
        ctx.logger.info(f"Processing order: {data['id']}")
    
    return await ctx.match({
        "http": handle_api,
        "queue": handle_queue,
    })

Enqueueing messages

Use ctx.enqueue() or the legacy ctx.emit() to send messages to queue topics.

Basic enqueue

await ctx.enqueue({
    "topic": "order.created",
    "data": {"order_id": "123", "amount": 99.99},
})

With trace ID propagation

await ctx.enqueue({
    "topic": "order.created",
    "data": {"order_id": "123", "trace_id": ctx.trace_id},
})

Legacy emit syntax

# Also supported for backwards compatibility
await ctx.emit({
    "topic": "order.created",
    "data": {"order_id": "123"},
})

Using state

Access the state manager via ctx.state:
async def handler(input, ctx: FlowContext) -> None:
    # Get a value
    user = await ctx.state.get("users", "user-123")
    
    # Set a value
    await ctx.state.set("users", "user-123", {
        "name": "Alice",
        "email": "alice@example.com",
    })
    
    # List all values in scope
    all_users = await ctx.state.list("users")
    
    # Delete a value
    await ctx.state.delete("users", "user-123")
See StateManager for full API.

Using streams

Access initialized streams via ctx.streams:
from motia import Stream

todo_stream = Stream("todo")

async def handler(input, ctx: FlowContext) -> None:
    # Access stream from context
    stream = ctx.streams.get("todo") or todo_stream
    
    # Set a value
    await stream.set("user-123", "todo-1", {
        "title": "Buy groceries",
        "completed": False,
    })
    
    # Get a value
    todo = await stream.get("user-123", "todo-1")
    
    # List all items in group
    all_todos = await stream.list("user-123")
See Streams for full API.

Logging

Use the structured logger for output:
async def handler(input, ctx: FlowContext) -> None:
    # Simple message
    ctx.logger.info("Processing started")
    
    # With structured data
    ctx.logger.info(
        "Order created",
        {"order_id": "123", "amount": 99.99}
    )
    
    # Error logging
    try:
        await process_order()
    except Exception as e:
        ctx.logger.error(f"Failed to process order: {e}")
    
    # Other levels
    ctx.logger.debug("Debug info")
    ctx.logger.warning("Warning message")
    ctx.logger.error("Error occurred")

Trigger information

Access trigger details via ctx.trigger:
async def handler(input, ctx: FlowContext) -> None:
    # Check trigger type
    ctx.logger.info(f"Triggered by: {ctx.trigger.type}")
    
    # HTTP-specific info
    if ctx.is_api():
        ctx.logger.info(f"Path: {ctx.trigger.path}")
        ctx.logger.info(f"Method: {ctx.trigger.method}")
    
    # Queue-specific info
    if ctx.is_queue():
        ctx.logger.info(f"Topic: {ctx.trigger.topic}")
    
    # Cron-specific info
    if ctx.is_cron():
        ctx.logger.info(f"Schedule: {ctx.trigger.expression}")
    
    # Trigger index
    ctx.logger.info(f"Trigger index: {ctx.trigger.index}")

Complete example

from motia import (
    step,
    http,
    queue,
    FlowContext,
    ApiRequest,
    ApiResponse,
    Stream,
)

order_stream = Stream("orders")

config = {
    "name": "OrderProcessor",
    "triggers": [
        http("POST", "/api/orders"),
        queue("order.retry"),
    ],
    "enqueues": ["order.created", "order.failed"],
}

async def handler(input, ctx: FlowContext) -> Any:
    """Process orders from API or queue."""
    
    async def handle_api(request: ApiRequest) -> ApiResponse:
        body = request.body or {}
        
        # Validate
        if not body.get("amount"):
            return ApiResponse(status=400, body={"error": "Amount required"})
        
        # Generate ID
        order_id = f"order-{ctx.trace_id[:8]}"
        
        # Save to stream
        await order_stream.set("pending", order_id, {
            "id": order_id,
            "amount": body["amount"],
            "status": "pending",
        })
        
        # Save metadata to state
        await ctx.state.set("order-metadata", order_id, {
            "created_at": ctx.trace_id,
            "source": "api",
        })
        
        # Enqueue for processing
        await ctx.enqueue({
            "topic": "order.created",
            "data": {"order_id": order_id},
        })
        
        ctx.logger.info("Order created", {"order_id": order_id})
        return ApiResponse(status=201, body={"id": order_id})
    
    async def handle_queue(data: dict) -> None:
        order_id = data.get("order_id")
        
        try:
            # Process order
            ctx.logger.info("Processing order", {"order_id": order_id})
            
            # Update stream
            await order_stream.set("completed", order_id, {
                "id": order_id,
                "status": "completed",
            })
            
        except Exception as e:
            ctx.logger.error(f"Order processing failed: {e}")
            await ctx.enqueue({
                "topic": "order.failed",
                "data": {"order_id": order_id, "error": str(e)},
            })
    
    return await ctx.match({
        "http": handle_api,
        "queue": handle_queue,
    })

my_step = step(config, handler)

Build docs developers (and LLMs) love