The FlowContext object is passed as the second parameter to all step handlers. It provides access to state management, logging, event enqueueing, stream access, and trigger information.
Type signature
class FlowContext(BaseModel, Generic[TEnqueueData]):
enqueue: Enqueuer
trace_id: str
state: InternalStateManager
logger: Logger
streams: dict[str, Stream[Any]]
trigger: TriggerInfo
input_value: Any
Properties
Function to enqueue messages to topics: async (data) -> None.
Unique identifier for tracking this execution across steps.
State manager for persistent key-value storage. See StateManager.
Structured logger for output. Use logger.info(), logger.error(), etc.
Dictionary of initialized streams by name. See Streams.
Information about which trigger fired this execution.
Raw input value received by the handler. Type depends on trigger type.
Methods
is_queue()
Check if the current trigger is a queue trigger.
def is_queue(self) -> bool
True if trigger type is "queue", otherwise False.
Example
async def handler(input, ctx: FlowContext) -> None:
if ctx.is_queue():
ctx.logger.info("Processing queue message")
is_api()
Check if the current trigger is an HTTP API request.
True if trigger type is "http", otherwise False.
Example
async def handler(input, ctx: FlowContext) -> Any:
if ctx.is_api():
return ApiResponse(status=200, body={"message": "OK"})
is_cron()
Check if the current trigger is a cron schedule.
def is_cron(self) -> bool
True if trigger type is "cron", otherwise False.
is_state()
Check if the current trigger is a state change.
def is_state(self) -> bool
True if trigger type is "state", otherwise False.
is_stream()
Check if the current trigger is a stream event.
def is_stream(self) -> bool
True if trigger type is "stream", otherwise False.
get_data()
Extract the data payload from input, normalized across trigger types.
def get_data(self) -> Any
- For HTTP triggers: returns
request.body
- For queue triggers: returns the queue data directly
- For cron triggers: returns
None
- For state/stream triggers: returns the input object
Example
async def handler(input, ctx: FlowContext) -> None:
data = ctx.get_data() # Works for any trigger type
if data:
ctx.logger.info(f"Processing data: {data}")
match()
Dispatch to different handlers based on trigger type. This is the recommended pattern for multi-trigger steps.
async def match(self, handlers: dict[str, Callable]) -> Any
handlers
dict[str, Callable]
required
Dictionary mapping trigger types to handler functions. Supported keys:
"http" or "api": Handler for HTTP triggers
"queue": Handler for queue triggers
"cron": Handler for cron triggers
"state": Handler for state triggers
"stream": Handler for stream triggers
"default": Fallback handler if no type matches
Return value from the matched handler.
Handler signatures
# HTTP handler
async def http_handler(request: ApiRequest) -> ApiResponse:
pass
# Queue handler
async def queue_handler(data: Any) -> None:
pass
# Cron handler (no input)
async def cron_handler() -> None:
pass
# State handler
async def state_handler(input: StateTriggerInput) -> Any:
pass
# Stream handler
async def stream_handler(input: StreamTriggerInput) -> Any:
pass
# Default fallback
async def default_handler(input: Any) -> Any:
pass
Example
from motia import step, http, queue, FlowContext, ApiRequest, ApiResponse
my_step = step({
"name": "ProcessOrder",
"triggers": [
http("POST", "/orders"),
queue("order.retry"),
],
})
async def handler(input, ctx: FlowContext) -> Any:
async def handle_api(request: ApiRequest) -> ApiResponse:
ctx.logger.info(f"API request to {ctx.trigger.path}")
order_id = request.body.get("id")
return ApiResponse(status=201, body={"id": order_id})
async def handle_queue(data: dict) -> None:
ctx.logger.info(f"Queue message from {ctx.trigger.topic}")
ctx.logger.info(f"Processing order: {data['id']}")
return await ctx.match({
"http": handle_api,
"queue": handle_queue,
})
Enqueueing messages
Use ctx.enqueue() or the legacy ctx.emit() to send messages to queue topics.
Basic enqueue
await ctx.enqueue({
"topic": "order.created",
"data": {"order_id": "123", "amount": 99.99},
})
With trace ID propagation
await ctx.enqueue({
"topic": "order.created",
"data": {"order_id": "123", "trace_id": ctx.trace_id},
})
Legacy emit syntax
# Also supported for backwards compatibility
await ctx.emit({
"topic": "order.created",
"data": {"order_id": "123"},
})
Using state
Access the state manager via ctx.state:
async def handler(input, ctx: FlowContext) -> None:
# Get a value
user = await ctx.state.get("users", "user-123")
# Set a value
await ctx.state.set("users", "user-123", {
"name": "Alice",
"email": "alice@example.com",
})
# List all values in scope
all_users = await ctx.state.list("users")
# Delete a value
await ctx.state.delete("users", "user-123")
See StateManager for full API.
Using streams
Access initialized streams via ctx.streams:
from motia import Stream
todo_stream = Stream("todo")
async def handler(input, ctx: FlowContext) -> None:
# Access stream from context
stream = ctx.streams.get("todo") or todo_stream
# Set a value
await stream.set("user-123", "todo-1", {
"title": "Buy groceries",
"completed": False,
})
# Get a value
todo = await stream.get("user-123", "todo-1")
# List all items in group
all_todos = await stream.list("user-123")
See Streams for full API.
Logging
Use the structured logger for output:
async def handler(input, ctx: FlowContext) -> None:
# Simple message
ctx.logger.info("Processing started")
# With structured data
ctx.logger.info(
"Order created",
{"order_id": "123", "amount": 99.99}
)
# Error logging
try:
await process_order()
except Exception as e:
ctx.logger.error(f"Failed to process order: {e}")
# Other levels
ctx.logger.debug("Debug info")
ctx.logger.warning("Warning message")
ctx.logger.error("Error occurred")
Access trigger details via ctx.trigger:
async def handler(input, ctx: FlowContext) -> None:
# Check trigger type
ctx.logger.info(f"Triggered by: {ctx.trigger.type}")
# HTTP-specific info
if ctx.is_api():
ctx.logger.info(f"Path: {ctx.trigger.path}")
ctx.logger.info(f"Method: {ctx.trigger.method}")
# Queue-specific info
if ctx.is_queue():
ctx.logger.info(f"Topic: {ctx.trigger.topic}")
# Cron-specific info
if ctx.is_cron():
ctx.logger.info(f"Schedule: {ctx.trigger.expression}")
# Trigger index
ctx.logger.info(f"Trigger index: {ctx.trigger.index}")
Complete example
from motia import (
step,
http,
queue,
FlowContext,
ApiRequest,
ApiResponse,
Stream,
)
order_stream = Stream("orders")
config = {
"name": "OrderProcessor",
"triggers": [
http("POST", "/api/orders"),
queue("order.retry"),
],
"enqueues": ["order.created", "order.failed"],
}
async def handler(input, ctx: FlowContext) -> Any:
"""Process orders from API or queue."""
async def handle_api(request: ApiRequest) -> ApiResponse:
body = request.body or {}
# Validate
if not body.get("amount"):
return ApiResponse(status=400, body={"error": "Amount required"})
# Generate ID
order_id = f"order-{ctx.trace_id[:8]}"
# Save to stream
await order_stream.set("pending", order_id, {
"id": order_id,
"amount": body["amount"],
"status": "pending",
})
# Save metadata to state
await ctx.state.set("order-metadata", order_id, {
"created_at": ctx.trace_id,
"source": "api",
})
# Enqueue for processing
await ctx.enqueue({
"topic": "order.created",
"data": {"order_id": order_id},
})
ctx.logger.info("Order created", {"order_id": order_id})
return ApiResponse(status=201, body={"id": order_id})
async def handle_queue(data: dict) -> None:
order_id = data.get("order_id")
try:
# Process order
ctx.logger.info("Processing order", {"order_id": order_id})
# Update stream
await order_stream.set("completed", order_id, {
"id": order_id,
"status": "completed",
})
except Exception as e:
ctx.logger.error(f"Order processing failed: {e}")
await ctx.enqueue({
"topic": "order.failed",
"data": {"order_id": order_id, "error": str(e)},
})
return await ctx.match({
"http": handle_api,
"queue": handle_queue,
})
my_step = step(config, handler)