Skip to main content
Streams provide distributed state management with built-in event triggering. Data is organized into groups (like user IDs or tenant IDs) and items within those groups. Changes to stream data automatically trigger subscribed steps.

Creating a stream

from motia import Stream

# Simple stream
todo_stream = Stream("todo")

# Stream with configuration
from motia import StreamConfig

todo_stream = Stream(StreamConfig(
    name="todo",
    description="Todo list items",
    schema={"title": "string", "completed": "boolean"},
))

Type signature

class Stream(Generic[TData]):
    def __init__(self, config: StreamConfig | str) -> None
config
StreamConfig | string
required
Stream configuration or stream name string. If a string is provided, a basic StreamConfig is created automatically.

Methods

get()

Retrieve an item from the stream.
async def get(self, group_id: str, item_id: str) -> TData | None
group_id
string
required
The group identifier (e.g., user ID, tenant ID).
item_id
string
required
The item identifier within the group.
return
TData | None
The stored item data, or None if not found.

Example

from motia import Stream, FlowContext

todo_stream = Stream("todo")

async def handler(input, ctx: FlowContext) -> None:
    todo = await todo_stream.get("user-123", "todo-1")
    
    if todo:
        ctx.logger.info(f"Found todo: {todo['title']}")
    else:
        ctx.logger.info("Todo not found")

set()

Store or update an item in the stream.
async def set(self, group_id: str, item_id: str, data: TData) -> Any
group_id
string
required
The group identifier.
item_id
string
required
The item identifier within the group.
data
TData
required
The data to store. Must be JSON-serializable.
return
Any
Result from the stream operation.
Calling set() triggers stream event listeners with a create or update event type.

Example

from motia import Stream, FlowContext, ApiRequest, ApiResponse, http, step

todo_stream = Stream("todo")

my_step = step({
    "name": "CreateTodo",
    "triggers": [http("POST", "/todos")],
})

async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    body = request.body or {}
    
    todo_id = f"todo-{ctx.trace_id[:8]}"
    
    # Create todo in stream
    await todo_stream.set("user-123", todo_id, {
        "id": todo_id,
        "title": body.get("title"),
        "completed": False,
        "created_at": ctx.trace_id,
    })
    
    ctx.logger.info("Todo created", {"todo_id": todo_id})
    return ApiResponse(status=201, body={"id": todo_id})

delete()

Remove an item from the stream.
async def delete(self, group_id: str, item_id: str) -> None
group_id
string
required
The group identifier.
item_id
string
required
The item identifier within the group.
return
None
No return value.
Calling delete() triggers stream event listeners with a delete event type.

Example

from motia import Stream, FlowContext

todo_stream = Stream("todo")

async def handler(input, ctx: FlowContext) -> None:
    await todo_stream.delete("user-123", "todo-1")
    ctx.logger.info("Todo deleted")

update()

Update an item using atomic update operations.
async def update(
    self,
    group_id: str,
    item_id: str,
    ops: list[dict[str, Any]]
) -> Any
group_id
string
required
The group identifier.
item_id
string
required
The item identifier within the group.
ops
list[dict]
required
List of update operations. Each operation is a dict with op, path, and optionally value.
return
Any
The updated item data.

Update operations

Supported operations follow JSON Patch (RFC 6902) semantics:
  • {"op": "add", "path": "/field", "value": ...} - Add or replace a field
  • {"op": "remove", "path": "/field"} - Remove a field
  • {"op": "replace", "path": "/field", "value": ...} - Replace a field value
  • {"op": "increment", "path": "/count", "value": 1} - Increment a numeric field

Example

from motia import Stream, FlowContext

todo_stream = Stream("todo")

async def handler(input, ctx: FlowContext) -> None:
    # Toggle completed status
    await todo_stream.update("user-123", "todo-1", [
        {"op": "replace", "path": "/completed", "value": True},
    ])
    
    # Increment a counter
    await todo_stream.update("user-123", "todo-1", [
        {"op": "increment", "path": "/view_count", "value": 1},
    ])
    
    ctx.logger.info("Todo updated")

list() / get_group()

List all items in a group. Both methods are aliases.
async def list(self, group_id: str) -> list[TData]
async def get_group(self, group_id: str) -> list[TData]
group_id
string
required
The group identifier to list items from.
return
list[TData]
List of all items in the group.

Example

from motia import Stream, FlowContext, ApiRequest, ApiResponse, http, step

todo_stream = Stream("todo")

my_step = step({
    "name": "ListTodos",
    "triggers": [http("GET", "/todos")],
})

async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todos = await todo_stream.list("user-123")
    
    ctx.logger.info(f"Found {len(todos)} todos")
    
    return ApiResponse(status=200, body={
        "todos": todos,
        "count": len(todos),
    })

list_groups()

List all group IDs that have items in the stream.
async def list_groups(self) -> list[str]
return
list[str]
List of all group identifiers.

Example

from motia import Stream, FlowContext

todo_stream = Stream("todo")

async def handler(input, ctx: FlowContext) -> None:
    groups = await todo_stream.list_groups()
    
    ctx.logger.info(f"Users with todos: {groups}")
    # Example output: ["user-123", "user-456", "user-789"]
    
    # Get todo count per user
    for group_id in groups:
        todos = await todo_stream.list(group_id)
        ctx.logger.info(f"User {group_id}: {len(todos)} todos")

Stream triggers

Steps can be triggered when stream events occur. Use the stream() trigger factory:
from motia import stream, step, FlowContext, StreamTriggerInput

my_step = step({
    "name": "OnTodoEvent",
    "triggers": [
        stream("todo"),
    ],
})

async def handler(input: StreamTriggerInput, ctx: FlowContext) -> None:
    ctx.logger.info(
        f"Todo event: {input.event.type}",
        {
            "stream": input.stream_name,
            "group": input.group_id,
            "item": input.id,
            "data": input.event.data,
        },
    )
    
    if input.event.type == "create":
        ctx.logger.info("New todo created")
    elif input.event.type == "update":
        ctx.logger.info("Todo updated")
    elif input.event.type == "delete":
        ctx.logger.info("Todo deleted")

Filter by group or item

from motia import stream, step

# Only watch specific group
my_step = step({
    "name": "OnUserTodoEvent",
    "triggers": [
        stream("todo", group_id="user-123"),
    ],
})

# Only watch specific item in group
my_step = step({
    "name": "OnSpecificTodoEvent",
    "triggers": [
        stream("todo", group_id="user-123", item_id="todo-1"),
    ],
})

Conditional triggers

from motia import stream, step, FlowContext, StreamTriggerInput

def only_creates(input: StreamTriggerInput, ctx: FlowContext) -> bool:
    return input.event.type == "create"

def high_priority(input: StreamTriggerInput, ctx: FlowContext) -> bool:
    data = input.event.data
    return isinstance(data, dict) and data.get("priority") == "high"

my_step = step({
    "name": "OnHighPriorityTodoCreated",
    "triggers": [
        stream("todo", condition=lambda i, c: only_creates(i, c) and high_priority(i, c)),
    ],
})
See Triggers for more information.

StreamConfig

Advanced stream configuration with event handlers and schema:
from motia import StreamConfig, FlowContext, StreamSubscription

async def on_join(sub: StreamSubscription, ctx: FlowContext, auth_ctx: Any) -> None:
    """Called when a client subscribes to the stream."""
    ctx.logger.info(f"Client joined group {sub.group_id}")

async def on_leave(sub: StreamSubscription, ctx: FlowContext, auth_ctx: Any) -> None:
    """Called when a client unsubscribes from the stream."""
    ctx.logger.info(f"Client left group {sub.group_id}")

stream_config = StreamConfig(
    name="todo",
    description="Todo list items",
    schema={
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "completed": {"type": "boolean"},
        },
        "required": ["title"],
    },
    on_join=on_join,
    on_leave=on_leave,
)

todo_stream = Stream(stream_config)

StreamConfig properties

name
string
required
Unique stream name.
description
string
Human-readable description of the stream.
schema
dict
JSON schema for validating stream data.
base_config
dict
Base configuration options.
on_join
async function
Handler called when clients subscribe. Signature: async (subscription, ctx, auth_ctx) -> Any.
on_leave
async function
Handler called when clients unsubscribe. Signature: async (subscription, ctx, auth_ctx) -> Any.

Complete example

from motia import (
    step,
    http,
    stream as stream_trigger,
    Stream,
    FlowContext,
    ApiRequest,
    ApiResponse,
    StreamTriggerInput,
)

# Create stream
todo_stream = Stream("todo")

# Step 1: Create todo via API
create_step = step({
    "name": "CreateTodo",
    "triggers": [http("POST", "/api/todos")],
})

async def create_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    body = request.body or {}
    
    if not body.get("title"):
        return ApiResponse(status=400, body={"error": "Title required"})
    
    user_id = request.headers.get("x-user-id", "anonymous")
    todo_id = f"todo-{ctx.trace_id[:8]}"
    
    # Store in stream (triggers stream events)
    await todo_stream.set(user_id, todo_id, {
        "id": todo_id,
        "title": body["title"],
        "completed": False,
        "created_at": ctx.trace_id,
    })
    
    ctx.logger.info("Todo created", {"user_id": user_id, "todo_id": todo_id})
    return ApiResponse(status=201, body={"id": todo_id})

create_step.handle(create_handler)

# Step 2: Update todo
update_step = step({
    "name": "UpdateTodo",
    "triggers": [http("PATCH", "/api/todos/:id")],
})

async def update_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todo_id = request.path_params.get("id")
    body = request.body or {}
    user_id = request.headers.get("x-user-id", "anonymous")
    
    # Check if todo exists
    existing = await todo_stream.get(user_id, todo_id)
    if not existing:
        return ApiResponse(status=404, body={"error": "Todo not found"})
    
    # Update using atomic operations
    ops = []
    if "completed" in body:
        ops.append({"op": "replace", "path": "/completed", "value": body["completed"]})
    if "title" in body:
        ops.append({"op": "replace", "path": "/title", "value": body["title"]})
    
    if ops:
        updated = await todo_stream.update(user_id, todo_id, ops)
        ctx.logger.info("Todo updated", {"todo_id": todo_id})
        return ApiResponse(status=200, body=updated)
    
    return ApiResponse(status=200, body=existing)

update_step.handle(update_handler)

# Step 3: List todos
list_step = step({
    "name": "ListTodos",
    "triggers": [http("GET", "/api/todos")],
})

async def list_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    user_id = request.headers.get("x-user-id", "anonymous")
    
    todos = await todo_stream.list(user_id)
    
    # Filter by completion status if requested
    completed = request.query_params.get("completed")
    if completed is not None:
        is_completed = completed.lower() == "true"
        todos = [t for t in todos if t.get("completed") == is_completed]
    
    ctx.logger.info(f"Listed {len(todos)} todos for user {user_id}")
    return ApiResponse(status=200, body={"todos": todos, "count": len(todos)})

list_step.handle(list_handler)

# Step 4: Delete todo
delete_step = step({
    "name": "DeleteTodo",
    "triggers": [http("DELETE", "/api/todos/:id")],
})

async def delete_handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    todo_id = request.path_params.get("id")
    user_id = request.headers.get("x-user-id", "anonymous")
    
    # Check if exists
    existing = await todo_stream.get(user_id, todo_id)
    if not existing:
        return ApiResponse(status=404, body={"error": "Todo not found"})
    
    # Delete from stream (triggers stream event)
    await todo_stream.delete(user_id, todo_id)
    
    ctx.logger.info("Todo deleted", {"todo_id": todo_id})
    return ApiResponse(status=204, body=None)

delete_step.handle(delete_handler)

# Step 5: React to stream events
def only_completed(input: StreamTriggerInput, ctx: FlowContext) -> bool:
    """Only trigger when todos are marked complete."""
    if input.event.type != "update":
        return False
    data = input.event.data
    return isinstance(data, dict) and data.get("completed") is True

event_step = step({
    "name": "OnTodoCompleted",
    "triggers": [
        stream_trigger("todo", condition=only_completed),
    ],
    "enqueues": ["todo.completed"],
})

async def event_handler(input: StreamTriggerInput, ctx: FlowContext) -> None:
    ctx.logger.info(
        f"Todo {input.id} completed by user {input.group_id}",
        {"data": input.event.data},
    )
    
    # Send notification
    await ctx.enqueue({
        "topic": "todo.completed",
        "data": {
            "user_id": input.group_id,
            "todo_id": input.id,
            "title": input.event.data.get("title"),
        },
    })

event_step.handle(event_handler)

Build docs developers (and LLMs) love