Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/8BitTacoSupreme/flowstate/llms.txt

Use this file to discover all available pages before exploring further.

The EventBus is a synchronous, priority-ordered dispatch system that lets the orchestrator announce what happened without knowing what anyone does with that information. The orchestrator emits a StepCompleted or StepFailed event after each tool step. Memory handlers registered on the bus pick those events up and store the results in memory.db. Adding new side effects — logging, alerting, analytics — means registering a new handler, not editing the orchestrator.

Event Infrastructure

The event system lives in flowstate/events/ and is composed of four modules:

event.py

Defines the Event base model and all concrete domain events (StepCompleted, StepFailed, PipelineStarted, etc.)

bus.py

EventBus — dispatches events to registered handlers in priority order with error isolation

handler.py

@handler decorator and EventHandler protocol — marks functions with event_types and priority metadata

registry.py

HandlerRegistry — stores and sorts handlers by priority, supports wildcard "*" subscriptions

Event Base Class (event.py)

All events are immutable Pydantic models extending Event:
class Event(BaseModel):
    event_type: str
    payload: dict[str, Any] = Field(default_factory=dict)
    event_id: str = Field(default_factory=lambda: uuid4().hex[:12])
    timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
    source: str = ""
    metadata: dict[str, Any] = Field(default_factory=dict)

    model_config = {"frozen": True}
frozen=True means events cannot be mutated after creation. Use event.with_metadata(**kwargs) to get a copy with additional metadata merged in.

EventPriority

Handlers execute in priority order — lower integer runs first:
class EventPriority(IntEnum):
    CRITICAL = 0
    HIGH     = 10
    NORMAL   = 50
    LOW      = 90
    AUDIT    = 100
Memory handlers use EventPriority.AUDIT (100) so they run after any business-logic handlers at NORMAL or HIGH priority.

Concrete Domain Events

class PipelineStarted(Event):
    event_type: str = "pipeline.started"

class PipelineCompleted(Event):
    event_type: str = "pipeline.completed"

class StepStarted(Event):
    event_type: str = "step.started"

class StepCompleted(Event):
    event_type: str = "step.completed"

class StepFailed(Event):
    event_type: str = "step.failed"

class StateChanged(Event):
    event_type: str = "state.changed"
The orchestrator currently emits only StepCompleted and StepFailed. The other event types are available for future use or custom handler registration.

EventBus (bus.py)

class EventBus:
    def __init__(self, *, keep_history: bool = False) -> None:
        self._registry = HandlerRegistry()
        self._keep_history = keep_history
        self._history: list[Event] = []
        self._error_handlers: list[Callable[[Event, Exception], None]] = []

Registration methods

# Register a handler for a specific event type string
bus.on("step.completed", my_handler, priority=EventPriority.HIGH)

# Auto-register a @handler-decorated callable using its event_types attribute
bus.register(handler_fn)

# Register an error handler that fires when a dispatch raises
bus.on_error(my_error_handler)
bus.register() is what the orchestrator uses after create_memory_handlers() returns a list of decorated callables.

emit()

emit() dispatches an event to all matching handlers (specific type + wildcard "*") sorted by priority. Exceptions in individual handlers are caught and forwarded to error handlers so one broken handler cannot block others:
def emit(self, event: Event) -> list[Any]:
    if self._keep_history:
        self._history.append(event)

    handlers = self._registry.get_all_handlers(event)
    results: list[Any] = []

    for h in handlers:
        try:
            result = h(event)
            results.append(result)
        except Exception as exc:
            logger.exception("Handler %s failed for %s", h, event.event_type)
            self._dispatch_error(event, exc)
            results.append(None)

    return results

Introspection

bus.history           # list of emitted events (only populated when keep_history=True)
bus.registered_types  # list of event type strings that have handlers
bus.clear()           # remove all handlers and history

@handler Decorator (handler.py)

The @handler decorator attaches event_types and priority metadata to a function so bus.register() can auto-register it:
def handler(
    *event_types: str,
    priority: EventPriority = EventPriority.NORMAL,
) -> Callable:
    def decorator(fn: Callable) -> Callable:
        fn.event_types = list(event_types)
        fn.priority = priority

        @functools.wraps(fn)
        def wrapper(event: Event) -> Any:
            return fn(event)

        wrapper.event_types = fn.event_types
        wrapper.priority = fn.priority
        return wrapper
    return decorator
Usage:
from flowstate.events.handler import handler
from flowstate.events.event import EventPriority

@handler("step.completed", "step.failed", priority=EventPriority.HIGH)
def my_listener(event):
    print(f"{event.event_type}: {event.payload}")
A function decorated with @handler but passed to bus.register() without calling the decorator first will raise ValueError: Handler has no event_types attribute — use @handler().

Events Emitted by the Orchestrator

The orchestrator emits exactly two event types inside _run_step():

StepCompleted

Emitted when execute_fn() returns a ToolResult with success=True:
bus.emit(
    StepCompleted(
        payload={"tool": tool_name, "artifacts": result.artifacts},
        source="orchestrator",
    )
)
Payload keyTypeExample
toolstr"research"
artifactslist[str]["/path/to/research/report.md"]

StepFailed

Emitted when execute_fn() returns a ToolResult with success=False:
bus.emit(
    StepFailed(
        payload={"tool": tool_name, "error": result.error or "unknown"},
        source="orchestrator",
    )
)
Payload keyTypeExample
toolstr"strategy"
errorstr"claude CLI not found"

Memory Handlers (memory_handlers.py)

create_memory_handlers(memory, root, run_id) is a factory that closes over the MemoryStore instance and returns two decorated handler functions ready to be passed to bus.register().

on_step_completed

Registered for "step.completed" at EventPriority.AUDIT:
@handler("step.completed", priority=EventPriority.AUDIT)
def on_step_completed(event: Event) -> None:
    tool_name = event.payload.get("tool", "")
    artifacts = event.payload.get("artifacts", [])
    kind = TOOL_TO_KIND.get(tool_name, MemoryKind.INSIGHT)

    for artifact_path in artifacts:
        path = Path(artifact_path)
        if not path.is_absolute():
            path = root / path
        if not path.exists() or not path.is_file():
            continue

        content = path.read_text(errors="replace")[:MAX_ARTIFACT_CHARS]
        sections = _split_sections(content)
        ...
        store.add_many(entries)
_split_sections(text) uses a regex to split the artifact file on ## headings (level-2 markdown). Each (heading, body) pair becomes a separate MemoryEntry in memory.db. Text before the first heading is stored under the key "Overview". This granular storage ensures FTS5 searches return specific relevant sections rather than entire documents. Memory kind mapping:
TOOL_TO_KIND = {
    "research":   MemoryKind.RESEARCH,
    "strategy":   MemoryKind.STRATEGY,
    "gsd":        MemoryKind.DECISION,
    "discipline": MemoryKind.TOOL_RUN,
}

on_step_failed

Registered for "step.failed" at EventPriority.AUDIT:
@handler("step.failed", priority=EventPriority.AUDIT)
def on_step_failed(event: Event) -> None:
    tool_name = event.payload.get("tool", "")
    error = event.payload.get("error", "unknown error")

    store.add(
        MemoryEntry.create(
            MemoryKind.TOOL_RUN,
            f"Tool '{tool_name}' failed: {error}",
            f"{tool_name} failure",
            source=tool_name,
            tags=[tool_name, "failure"],
            run_id=run_id,
        )
    )
Failure entries are stored under MemoryKind.TOOL_RUN so future runs can inject context like “strategy failed previously due to CLI not found” into adapter prompts.

Full Event Flow

The complete path from tool execution to memory storage:
# 1. Orchestrator — _run_step() dispatches the adapter
result = execute_fn()   # e.g. research.execute(state.interview)

# 2. On success, orchestrator emits StepCompleted
if result.success:
    bus.emit(
        StepCompleted(
            payload={"tool": "research", "artifacts": ["/path/research/report.md"]},
            source="orchestrator",
        )
    )

# 3. EventBus dispatches to all registered handlers for "step.completed"
#    HandlerRegistry returns handlers sorted by priority
handlers = registry.get_all_handlers(event)   # [on_step_completed]

# 4. on_step_completed (priority=AUDIT=100) fires
#    reads /path/research/report.md, splits by ## headings
#    stores each section in memory.db under MemoryKind.RESEARCH

# 5. Next pipeline run — ResearchAdapter injects prior knowledge
prior = self.get_memory_context("event sourcing")
# → "## Prior Knowledge\n\n[relevant sections from memory.db]\n\n---\n\n"
prompt = prior + "\n\n---\n\n" + topic_prompt

Registering a Custom Handler

from flowstate.events.bus import EventBus
from flowstate.events.handler import handler
from flowstate.events.event import EventPriority, Event


@handler("step.completed", priority=EventPriority.NORMAL)
def log_completed(event: Event) -> None:
    tool = event.payload.get("tool")
    artifacts = event.payload.get("artifacts", [])
    print(f"[audit] {tool} completed — {len(artifacts)} artifact(s)")


@handler("step.failed", priority=EventPriority.HIGH)
def alert_on_failure(event: Event) -> None:
    tool = event.payload.get("tool")
    error = event.payload.get("error")
    # send_alert(f"FlowState: {tool} failed — {error}")
    print(f"[alert] {tool} BLOCKED: {error}")


# Register on an existing bus (e.g. in a test or CLI extension)
bus = EventBus()
bus.register(log_completed)
bus.register(alert_on_failure)
Use EventBus(keep_history=True) in tests to assert which events were emitted and in what order without relying on side effects like file writes or database entries.
The HandlerRegistry supports wildcard registration via bus.on("*", my_handler) — this handler will receive every event type emitted on the bus, sorted into the priority queue alongside type-specific handlers.

Build docs developers (and LLMs) love