Overview
The EventManager provides a centralized way to register events, subscribe handlers, and process events asynchronously. It supports event queuing, error handling, and automatic exception event generation.
Features
- Event registration and validation
- Handler subscription with type hints
- Asynchronous event processing
- Error handling with automatic exception events
- Support for Union types in handlers
- Event queuing and batch processing
Constructor
EventManager(ignore_unknown_events: bool = True)
Parameters
If True, unknown events are ignored rather than raising errors.
Methods
register
Register event classes for use with the event manager.
manager.register(
*event_classes: type[BaseEvent] | type[ExceptionEvent],
ignore_not_compatible: bool = False
)
Parameters
event_classes
type[BaseEvent] | type[ExceptionEvent]
The event classes to register. Must have names ending with ‘Event’ and a ‘type’ attribute.
If True, log warning instead of raising error for incompatible classes.
Example
from vision_agents.core.events.manager import EventManager
from vision_agents.core.agents.events import AgentSayEvent
from vision_agents.core.llm.events import LLMResponseCompletedEvent
manager = EventManager()
manager.register(AgentSayEvent, LLMResponseCompletedEvent)
register_events_from_module
Register all event classes from a module.
manager.register_events_from_module(
module,
prefix: str = "",
ignore_not_compatible: bool = True
)
Parameters
The Python module to scan for event classes.
Optional prefix to filter event types. Only events with types starting with this prefix will be registered.
If True, log warning instead of raising error for incompatible classes.
Example
from vision_agents.core.agents import events as agent_events
from vision_agents.core.llm import events as llm_events
manager.register_events_from_module(agent_events, prefix="agent")
manager.register_events_from_module(llm_events, prefix="plugin")
subscribe
Subscribe a function to handle specific event types.
@manager.subscribe
async def handler(event: EventType):
...
The function must have type hints indicating which event types it handles. Supports both single event types and Union types for handling multiple event types.
Example
from vision_agents.core.agents.events import AgentSayEvent, AgentSayStartedEvent
from vision_agents.core.llm.events import LLMResponseCompletedEvent
# Single event type
@manager.subscribe
async def handle_say(event: AgentSayEvent):
print(f"Agent wants to say: {event.text}")
# Multiple event types using Union
@manager.subscribe
async def handle_speech_events(event: AgentSayStartedEvent | AgentSayCompletedEvent):
print(f"Speech event: {event.type}")
Handlers must be async coroutines. Use async def for all handlers.
unsubscribe
Unsubscribe a function from all event types.
manager.unsubscribe(function)
Parameters
The function to unsubscribe from all event types.
Example
@manager.subscribe
async def speech_handler(event: AgentSayEvent):
print("Speech started")
# Later, unsubscribe the handler
manager.unsubscribe(speech_handler)
send
Send one or more events for processing.
Events are added to the queue and will be processed by the background processing task. If an event handler raises an exception, an ExceptionEvent is automatically created and queued for processing.
Parameters
One or more event objects or dictionaries to send. Events can be instances of registered event classes or dictionaries with a ‘type’ field that matches a registered event type.
Example
from vision_agents.core.agents.events import AgentSayEvent
from vision_agents.core.llm.events import LLMRequestStartedEvent
# Send single event
manager.send(AgentSayEvent(
plugin_name="agent",
text="Hello, world!"
))
# Send multiple events
manager.send(
AgentSayEvent(plugin_name="agent", text="Hello"),
LLMRequestStartedEvent(plugin_name="openai", model="gpt-4", streaming=True)
)
# Send event from dictionary
manager.send({
"type": "agent.say",
"plugin_name": "agent",
"text": "Hello from dict"
})
wait
Wait for all queued events to be processed.
await manager.wait(timeout: float = 10.0)
This is useful in tests to ensure events are processed before assertions.
Parameters
Maximum time to wait for processing to complete (in seconds).
has_subscribers
Check whether any handler is registered for the given event class.
manager.has_subscribers(event_class: type[BaseEvent]) -> bool
Parameters
The event class to check for subscribers.
Returns
True if at least one handler is registered for this event type.
silent
Silence logging for an event class.
manager.silent(event_class: type[BaseEvent])
Parameters
The event class to silence from logging.
merge
Merge another EventManager into this one.
manager.merge(em: EventManager)
Parameters
The EventManager to merge into this one. The merged manager’s processing task will be stopped.
Complete Example
from vision_agents.core.events.manager import EventManager
from vision_agents.core.agents.events import (
AgentSayEvent,
AgentSayStartedEvent,
AgentSayCompletedEvent
)
from vision_agents.core.llm.events import (
LLMRequestStartedEvent,
LLMResponseCompletedEvent
)
# Create event manager
manager = EventManager()
# Register events
manager.register(
AgentSayEvent,
AgentSayStartedEvent,
AgentSayCompletedEvent,
LLMRequestStartedEvent,
LLMResponseCompletedEvent
)
# Subscribe to agent events
@manager.subscribe
async def handle_say(event: AgentSayEvent):
print(f"Agent wants to say: {event.text}")
@manager.subscribe
async def handle_say_started(event: AgentSayStartedEvent):
print(f"Speech synthesis started: {event.synthesis_id}")
@manager.subscribe
async def handle_say_completed(event: AgentSayCompletedEvent):
print(f"Speech completed in {event.duration_ms}ms")
# Subscribe to LLM events
@manager.subscribe
async def handle_llm_request(event: LLMRequestStartedEvent):
print(f"LLM request started: {event.model} (streaming={event.streaming})")
@manager.subscribe
async def handle_llm_response(event: LLMResponseCompletedEvent):
print(f"LLM response: {event.text[:100]}...")
print(f"Latency: {event.latency_ms}ms, Tokens: {event.total_tokens}")
# Subscribe to multiple event types using Union
@manager.subscribe
async def handle_speech_lifecycle(
event: AgentSayStartedEvent | AgentSayCompletedEvent
):
print(f"Speech lifecycle event: {event.type}")
# Send events
manager.send(AgentSayEvent(
plugin_name="agent",
text="Hello, world!"
))
manager.send(LLMRequestStartedEvent(
plugin_name="openai",
model="gpt-4",
streaming=True
))
# Wait for all events to be processed
await manager.wait()
Error Handling
If an event handler raises an exception, the EventManager automatically creates an ExceptionEvent and queues it for processing.
from vision_agents.core.events import ExceptionEvent
@manager.subscribe
async def handle_errors(event: ExceptionEvent):
print(f"Handler error: {event.exception}")
print(f"Failed handler: {event.handler}")
@manager.subscribe
async def problematic_handler(event: AgentSayEvent):
raise ValueError("Something went wrong!")
# This will trigger the handle_errors handler
manager.send(AgentSayEvent(plugin_name="agent", text="Test"))
Location
vision_agents/core/events/manager.py:49-568