Skip to main content

Overview

The EventManager provides a centralized way to register events, subscribe handlers, and process events asynchronously. It supports event queuing, error handling, and automatic exception event generation.

Features

  • Event registration and validation
  • Handler subscription with type hints
  • Asynchronous event processing
  • Error handling with automatic exception events
  • Support for Union types in handlers
  • Event queuing and batch processing

Constructor

EventManager(ignore_unknown_events: bool = True)

Parameters

ignore_unknown_events
bool
default:"True"
If True, unknown events are ignored rather than raising errors.

Methods

register

Register event classes for use with the event manager.
manager.register(
    *event_classes: type[BaseEvent] | type[ExceptionEvent],
    ignore_not_compatible: bool = False
)

Parameters

event_classes
type[BaseEvent] | type[ExceptionEvent]
The event classes to register. Must have names ending with ‘Event’ and a ‘type’ attribute.
ignore_not_compatible
bool
default:"False"
If True, log warning instead of raising error for incompatible classes.

Example

from vision_agents.core.events.manager import EventManager
from vision_agents.core.agents.events import AgentSayEvent
from vision_agents.core.llm.events import LLMResponseCompletedEvent

manager = EventManager()
manager.register(AgentSayEvent, LLMResponseCompletedEvent)

register_events_from_module

Register all event classes from a module.
manager.register_events_from_module(
    module,
    prefix: str = "",
    ignore_not_compatible: bool = True
)

Parameters

module
module
The Python module to scan for event classes.
prefix
str
default:""
Optional prefix to filter event types. Only events with types starting with this prefix will be registered.
ignore_not_compatible
bool
default:"True"
If True, log warning instead of raising error for incompatible classes.

Example

from vision_agents.core.agents import events as agent_events
from vision_agents.core.llm import events as llm_events

manager.register_events_from_module(agent_events, prefix="agent")
manager.register_events_from_module(llm_events, prefix="plugin")

subscribe

Subscribe a function to handle specific event types.
@manager.subscribe
async def handler(event: EventType):
    ...
The function must have type hints indicating which event types it handles. Supports both single event types and Union types for handling multiple event types.

Example

from vision_agents.core.agents.events import AgentSayEvent, AgentSayStartedEvent
from vision_agents.core.llm.events import LLMResponseCompletedEvent

# Single event type
@manager.subscribe
async def handle_say(event: AgentSayEvent):
    print(f"Agent wants to say: {event.text}")

# Multiple event types using Union
@manager.subscribe
async def handle_speech_events(event: AgentSayStartedEvent | AgentSayCompletedEvent):
    print(f"Speech event: {event.type}")
Handlers must be async coroutines. Use async def for all handlers.

unsubscribe

Unsubscribe a function from all event types.
manager.unsubscribe(function)

Parameters

function
callable
The function to unsubscribe from all event types.

Example

@manager.subscribe
async def speech_handler(event: AgentSayEvent):
    print("Speech started")

# Later, unsubscribe the handler
manager.unsubscribe(speech_handler)

send

Send one or more events for processing.
manager.send(*events)
Events are added to the queue and will be processed by the background processing task. If an event handler raises an exception, an ExceptionEvent is automatically created and queued for processing.

Parameters

events
BaseEvent | dict
One or more event objects or dictionaries to send. Events can be instances of registered event classes or dictionaries with a ‘type’ field that matches a registered event type.

Example

from vision_agents.core.agents.events import AgentSayEvent
from vision_agents.core.llm.events import LLMRequestStartedEvent

# Send single event
manager.send(AgentSayEvent(
    plugin_name="agent",
    text="Hello, world!"
))

# Send multiple events
manager.send(
    AgentSayEvent(plugin_name="agent", text="Hello"),
    LLMRequestStartedEvent(plugin_name="openai", model="gpt-4", streaming=True)
)

# Send event from dictionary
manager.send({
    "type": "agent.say",
    "plugin_name": "agent",
    "text": "Hello from dict"
})

wait

Wait for all queued events to be processed.
await manager.wait(timeout: float = 10.0)
This is useful in tests to ensure events are processed before assertions.

Parameters

timeout
float
default:"10.0"
Maximum time to wait for processing to complete (in seconds).

has_subscribers

Check whether any handler is registered for the given event class.
manager.has_subscribers(event_class: type[BaseEvent]) -> bool

Parameters

event_class
type[BaseEvent]
The event class to check for subscribers.

Returns

has_subscribers
bool
True if at least one handler is registered for this event type.

silent

Silence logging for an event class.
manager.silent(event_class: type[BaseEvent])

Parameters

event_class
type[BaseEvent]
The event class to silence from logging.

merge

Merge another EventManager into this one.
manager.merge(em: EventManager)

Parameters

em
EventManager
The EventManager to merge into this one. The merged manager’s processing task will be stopped.

Complete Example

manager.py:66-114
from vision_agents.core.events.manager import EventManager
from vision_agents.core.agents.events import (
    AgentSayEvent,
    AgentSayStartedEvent,
    AgentSayCompletedEvent
)
from vision_agents.core.llm.events import (
    LLMRequestStartedEvent,
    LLMResponseCompletedEvent
)

# Create event manager
manager = EventManager()

# Register events
manager.register(
    AgentSayEvent,
    AgentSayStartedEvent,
    AgentSayCompletedEvent,
    LLMRequestStartedEvent,
    LLMResponseCompletedEvent
)

# Subscribe to agent events
@manager.subscribe
async def handle_say(event: AgentSayEvent):
    print(f"Agent wants to say: {event.text}")

@manager.subscribe
async def handle_say_started(event: AgentSayStartedEvent):
    print(f"Speech synthesis started: {event.synthesis_id}")

@manager.subscribe
async def handle_say_completed(event: AgentSayCompletedEvent):
    print(f"Speech completed in {event.duration_ms}ms")

# Subscribe to LLM events
@manager.subscribe
async def handle_llm_request(event: LLMRequestStartedEvent):
    print(f"LLM request started: {event.model} (streaming={event.streaming})")

@manager.subscribe
async def handle_llm_response(event: LLMResponseCompletedEvent):
    print(f"LLM response: {event.text[:100]}...")
    print(f"Latency: {event.latency_ms}ms, Tokens: {event.total_tokens}")

# Subscribe to multiple event types using Union
@manager.subscribe
async def handle_speech_lifecycle(
    event: AgentSayStartedEvent | AgentSayCompletedEvent
):
    print(f"Speech lifecycle event: {event.type}")

# Send events
manager.send(AgentSayEvent(
    plugin_name="agent",
    text="Hello, world!"
))

manager.send(LLMRequestStartedEvent(
    plugin_name="openai",
    model="gpt-4",
    streaming=True
))

# Wait for all events to be processed
await manager.wait()

Error Handling

If an event handler raises an exception, the EventManager automatically creates an ExceptionEvent and queues it for processing.
from vision_agents.core.events import ExceptionEvent

@manager.subscribe
async def handle_errors(event: ExceptionEvent):
    print(f"Handler error: {event.exception}")
    print(f"Failed handler: {event.handler}")

@manager.subscribe
async def problematic_handler(event: AgentSayEvent):
    raise ValueError("Something went wrong!")

# This will trigger the handle_errors handler
manager.send(AgentSayEvent(plugin_name="agent", text="Test"))

Location

vision_agents/core/events/manager.py:49-568

Build docs developers (and LLMs) love