Streams provide distributed state management with built-in event triggering. Data is organized into groups (like user IDs or tenant IDs) and items within those groups. Changes to stream data automatically trigger subscribed steps.
Creating a stream
from motia import Stream
# Simple stream
todo_stream = Stream("todo")
# Stream with configuration
from motia import StreamConfig
todo_stream = Stream(StreamConfig(
name="todo",
description="Todo list items",
schema={"title": "string", "completed": "boolean"},
))
Type signature
class Stream(Generic[TData]):
def __init__(self, config: StreamConfig | str) -> None
config
StreamConfig | string
required
Stream configuration or stream name string. If a string is provided, a basic StreamConfig is created automatically.
Methods
get()
Retrieve an item from the stream.
async def get(self, group_id: str, item_id: str) -> TData | None
The group identifier (e.g., user ID, tenant ID).
The item identifier within the group.
The stored item data, or None if not found.
Example
from motia import Stream, FlowContext
todo_stream = Stream("todo")
async def handler(input, ctx: FlowContext) -> None:
todo = await todo_stream.get("user-123", "todo-1")
if todo:
ctx.logger.info(f"Found todo: {todo['title']}")
else:
ctx.logger.info("Todo not found")
set()
Store or update an item in the stream.
async def set(self, group_id: str, item_id: str, data: TData) -> Any
The item identifier within the group.
The data to store. Must be JSON-serializable.
Result from the stream operation.
Calling set() triggers stream event listeners with a create or update event type.
Example
from motia import Stream, FlowContext, ApiRequest, ApiResponse, http, step
todo_stream = Stream("todo")
my_step = step({
"name": "CreateTodo",
"triggers": [http("POST", "/todos")],
})
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
body = request.body or {}
todo_id = f"todo-{ctx.trace_id[:8]}"
# Create todo in stream
await todo_stream.set("user-123", todo_id, {
"id": todo_id,
"title": body.get("title"),
"completed": False,
"created_at": ctx.trace_id,
})
ctx.logger.info("Todo created", {"todo_id": todo_id})
return ApiResponse(status=201, body={"id": todo_id})
delete()
Remove an item from the stream.
async def delete(self, group_id: str, item_id: str) -> None
The item identifier within the group.
Calling delete() triggers stream event listeners with a delete event type.
Example
from motia import Stream, FlowContext
todo_stream = Stream("todo")
async def handler(input, ctx: FlowContext) -> None:
await todo_stream.delete("user-123", "todo-1")
ctx.logger.info("Todo deleted")
update()
Update an item using atomic update operations.
async def update(
self,
group_id: str,
item_id: str,
ops: list[dict[str, Any]]
) -> Any
The item identifier within the group.
List of update operations. Each operation is a dict with op, path, and optionally value.
Update operations
Supported operations follow JSON Patch (RFC 6902) semantics:
{"op": "add", "path": "/field", "value": ...} - Add or replace a field
{"op": "remove", "path": "/field"} - Remove a field
{"op": "replace", "path": "/field", "value": ...} - Replace a field value
{"op": "increment", "path": "/count", "value": 1} - Increment a numeric field
Example
from motia import Stream, FlowContext
todo_stream = Stream("todo")
async def handler(input, ctx: FlowContext) -> None:
# Toggle completed status
await todo_stream.update("user-123", "todo-1", [
{"op": "replace", "path": "/completed", "value": True},
])
# Increment a counter
await todo_stream.update("user-123", "todo-1", [
{"op": "increment", "path": "/view_count", "value": 1},
])
ctx.logger.info("Todo updated")
list() / get_group()
List all items in a group. Both methods are aliases.
async def list(self, group_id: str) -> list[TData]
async def get_group(self, group_id: str) -> list[TData]
The group identifier to list items from.
List of all items in the group.
Example
from motia import Stream, FlowContext, ApiRequest, ApiResponse, http, step
todo_stream = Stream("todo")
my_step = step({
"name": "ListTodos",
"triggers": [http("GET", "/todos")],
})
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
todos = await todo_stream.list("user-123")
ctx.logger.info(f"Found {len(todos)} todos")
return ApiResponse(status=200, body={
"todos": todos,
"count": len(todos),
})
list_groups()
List all group IDs that have items in the stream.
async def list_groups(self) -> list[str]
List of all group identifiers.
Example
from motia import Stream, FlowContext
todo_stream = Stream("todo")
async def handler(input, ctx: FlowContext) -> None:
groups = await todo_stream.list_groups()
ctx.logger.info(f"Users with todos: {groups}")
# Example output: ["user-123", "user-456", "user-789"]
# Get todo count per user
for group_id in groups:
todos = await todo_stream.list(group_id)
ctx.logger.info(f"User {group_id}: {len(todos)} todos")
Stream triggers
Steps can be triggered when stream events occur. Use the stream() trigger factory:
from motia import stream, step, FlowContext, StreamTriggerInput
my_step = step({
"name": "OnTodoEvent",
"triggers": [
stream("todo"),
],
})
async def handler(input: StreamTriggerInput, ctx: FlowContext) -> None:
ctx.logger.info(
f"Todo event: {input.event.type}",
{
"stream": input.stream_name,
"group": input.group_id,
"item": input.id,
"data": input.event.data,
},
)
if input.event.type == "create":
ctx.logger.info("New todo created")
elif input.event.type == "update":
ctx.logger.info("Todo updated")
elif input.event.type == "delete":
ctx.logger.info("Todo deleted")
Filter by group or item
from motia import stream, step
# Only watch specific group
my_step = step({
"name": "OnUserTodoEvent",
"triggers": [
stream("todo", group_id="user-123"),
],
})
# Only watch specific item in group
my_step = step({
"name": "OnSpecificTodoEvent",
"triggers": [
stream("todo", group_id="user-123", item_id="todo-1"),
],
})
Conditional triggers
from motia import stream, step, FlowContext, StreamTriggerInput
def only_creates(input: StreamTriggerInput, ctx: FlowContext) -> bool:
return input.event.type == "create"
def high_priority(input: StreamTriggerInput, ctx: FlowContext) -> bool:
data = input.event.data
return isinstance(data, dict) and data.get("priority") == "high"
my_step = step({
"name": "OnHighPriorityTodoCreated",
"triggers": [
stream("todo", condition=lambda i, c: only_creates(i, c) and high_priority(i, c)),
],
})
See Triggers for more information.
StreamConfig
Advanced stream configuration with event handlers and schema:
from motia import StreamConfig, FlowContext, StreamSubscription
async def on_join(sub: StreamSubscription, ctx: FlowContext, auth_ctx: Any) -> None:
"""Called when a client subscribes to the stream."""
ctx.logger.info(f"Client joined group {sub.group_id}")
async def on_leave(sub: StreamSubscription, ctx: FlowContext, auth_ctx: Any) -> None:
"""Called when a client unsubscribes from the stream."""
ctx.logger.info(f"Client left group {sub.group_id}")
stream_config = StreamConfig(
name="todo",
description="Todo list items",
schema={
"type": "object",
"properties": {
"title": {"type": "string"},
"completed": {"type": "boolean"},
},
"required": ["title"],
},
on_join=on_join,
on_leave=on_leave,
)
todo_stream = Stream(stream_config)
StreamConfig properties
Human-readable description of the stream.
JSON schema for validating stream data.
Base configuration options.
Handler called when clients subscribe. Signature: async (subscription, ctx, auth_ctx) -> Any.
Handler called when clients unsubscribe. Signature: async (subscription, ctx, auth_ctx) -> Any.
Complete example
from motia import (
step,
http,
stream as stream_trigger,
Stream,
FlowContext,
ApiRequest,
ApiResponse,
StreamTriggerInput,
)
# Create stream
todo_stream = Stream("todo")
# Step 1: Create todo via API
create_step = step({
"name": "CreateTodo",
"triggers": [http("POST", "/api/todos")],
})
async def create_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
body = request.body or {}
if not body.get("title"):
return ApiResponse(status=400, body={"error": "Title required"})
user_id = request.headers.get("x-user-id", "anonymous")
todo_id = f"todo-{ctx.trace_id[:8]}"
# Store in stream (triggers stream events)
await todo_stream.set(user_id, todo_id, {
"id": todo_id,
"title": body["title"],
"completed": False,
"created_at": ctx.trace_id,
})
ctx.logger.info("Todo created", {"user_id": user_id, "todo_id": todo_id})
return ApiResponse(status=201, body={"id": todo_id})
create_step.handle(create_handler)
# Step 2: Update todo
update_step = step({
"name": "UpdateTodo",
"triggers": [http("PATCH", "/api/todos/:id")],
})
async def update_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
todo_id = request.path_params.get("id")
body = request.body or {}
user_id = request.headers.get("x-user-id", "anonymous")
# Check if todo exists
existing = await todo_stream.get(user_id, todo_id)
if not existing:
return ApiResponse(status=404, body={"error": "Todo not found"})
# Update using atomic operations
ops = []
if "completed" in body:
ops.append({"op": "replace", "path": "/completed", "value": body["completed"]})
if "title" in body:
ops.append({"op": "replace", "path": "/title", "value": body["title"]})
if ops:
updated = await todo_stream.update(user_id, todo_id, ops)
ctx.logger.info("Todo updated", {"todo_id": todo_id})
return ApiResponse(status=200, body=updated)
return ApiResponse(status=200, body=existing)
update_step.handle(update_handler)
# Step 3: List todos
list_step = step({
"name": "ListTodos",
"triggers": [http("GET", "/api/todos")],
})
async def list_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
user_id = request.headers.get("x-user-id", "anonymous")
todos = await todo_stream.list(user_id)
# Filter by completion status if requested
completed = request.query_params.get("completed")
if completed is not None:
is_completed = completed.lower() == "true"
todos = [t for t in todos if t.get("completed") == is_completed]
ctx.logger.info(f"Listed {len(todos)} todos for user {user_id}")
return ApiResponse(status=200, body={"todos": todos, "count": len(todos)})
list_step.handle(list_handler)
# Step 4: Delete todo
delete_step = step({
"name": "DeleteTodo",
"triggers": [http("DELETE", "/api/todos/:id")],
})
async def delete_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
todo_id = request.path_params.get("id")
user_id = request.headers.get("x-user-id", "anonymous")
# Check if exists
existing = await todo_stream.get(user_id, todo_id)
if not existing:
return ApiResponse(status=404, body={"error": "Todo not found"})
# Delete from stream (triggers stream event)
await todo_stream.delete(user_id, todo_id)
ctx.logger.info("Todo deleted", {"todo_id": todo_id})
return ApiResponse(status=204, body=None)
delete_step.handle(delete_handler)
# Step 5: React to stream events
def only_completed(input: StreamTriggerInput, ctx: FlowContext) -> bool:
"""Only trigger when todos are marked complete."""
if input.event.type != "update":
return False
data = input.event.data
return isinstance(data, dict) and data.get("completed") is True
event_step = step({
"name": "OnTodoCompleted",
"triggers": [
stream_trigger("todo", condition=only_completed),
],
"enqueues": ["todo.completed"],
})
async def event_handler(input: StreamTriggerInput, ctx: FlowContext) -> None:
ctx.logger.info(
f"Todo {input.id} completed by user {input.group_id}",
{"data": input.event.data},
)
# Send notification
await ctx.enqueue({
"topic": "todo.completed",
"data": {
"user_id": input.group_id,
"todo_id": input.id,
"title": input.event.data.get("title"),
},
})
event_step.handle(event_handler)