EventBus: Decoupled Step Notifications in FlowState
EventBus API, @handler decorator, EventPriority, StepCompleted and StepFailed payload shapes, and the memory handler factory for pipeline event storage.
Use this file to discover all available pages before exploring further.
The EventBus is a synchronous, priority-ordered dispatch system that lets the orchestrator announce what happened without knowing what anyone does with that information. The orchestrator emits a StepCompleted or StepFailed event after each tool step. Memory handlers registered on the bus pick those events up and store the results in memory.db. Adding new side effects — logging, alerting, analytics — means registering a new handler, not editing the orchestrator.
# Register a handler for a specific event type stringbus.on("step.completed", my_handler, priority=EventPriority.HIGH)# Auto-register a @handler-decorated callable using its event_types attributebus.register(handler_fn)# Register an error handler that fires when a dispatch raisesbus.on_error(my_error_handler)
bus.register() is what the orchestrator uses after create_memory_handlers() returns a list of decorated callables.
emit() dispatches an event to all matching handlers (specific type + wildcard "*") sorted by priority. Exceptions in individual handlers are caught and forwarded to error handlers so one broken handler cannot block others:
def emit(self, event: Event) -> list[Any]: if self._keep_history: self._history.append(event) handlers = self._registry.get_all_handlers(event) results: list[Any] = [] for h in handlers: try: result = h(event) results.append(result) except Exception as exc: logger.exception("Handler %s failed for %s", h, event.event_type) self._dispatch_error(event, exc) results.append(None) return results
bus.history # list of emitted events (only populated when keep_history=True)bus.registered_types # list of event type strings that have handlersbus.clear() # remove all handlers and history
A function decorated with @handler but passed to bus.register() without calling the decorator first will raise ValueError: Handler has no event_types attribute — use @handler().
create_memory_handlers(memory, root, run_id) is a factory that closes over the MemoryStore instance and returns two decorated handler functions ready to be passed to bus.register().
Registered for "step.completed" at EventPriority.AUDIT:
@handler("step.completed", priority=EventPriority.AUDIT)def on_step_completed(event: Event) -> None: tool_name = event.payload.get("tool", "") artifacts = event.payload.get("artifacts", []) kind = TOOL_TO_KIND.get(tool_name, MemoryKind.INSIGHT) for artifact_path in artifacts: path = Path(artifact_path) if not path.is_absolute(): path = root / path if not path.exists() or not path.is_file(): continue content = path.read_text(errors="replace")[:MAX_ARTIFACT_CHARS] sections = _split_sections(content) ... store.add_many(entries)
_split_sections(text) uses a regex to split the artifact file on ## headings (level-2 markdown). Each (heading, body) pair becomes a separate MemoryEntry in memory.db. Text before the first heading is stored under the key "Overview". This granular storage ensures FTS5 searches return specific relevant sections rather than entire documents.Memory kind mapping:
Failure entries are stored under MemoryKind.TOOL_RUN so future runs can inject context like “strategy failed previously due to CLI not found” into adapter prompts.
from flowstate.events.bus import EventBusfrom flowstate.events.handler import handlerfrom flowstate.events.event import EventPriority, Event@handler("step.completed", priority=EventPriority.NORMAL)def log_completed(event: Event) -> None: tool = event.payload.get("tool") artifacts = event.payload.get("artifacts", []) print(f"[audit] {tool} completed — {len(artifacts)} artifact(s)")@handler("step.failed", priority=EventPriority.HIGH)def alert_on_failure(event: Event) -> None: tool = event.payload.get("tool") error = event.payload.get("error") # send_alert(f"FlowState: {tool} failed — {error}") print(f"[alert] {tool} BLOCKED: {error}")# Register on an existing bus (e.g. in a test or CLI extension)bus = EventBus()bus.register(log_completed)bus.register(alert_on_failure)
Use EventBus(keep_history=True) in tests to assert which events were emitted and in what order without relying on side effects like file writes or database entries.
The HandlerRegistry supports wildcard registration via bus.on("*", my_handler) — this handler will receive every event type emitted on the bus, sorted into the priority queue alongside type-specific handlers.