Overview
LLM events track language model operations, including standard request/response cycles, streaming responses, tool executions, realtime session management, and vision-language model (VLM) inferences.
Request/Response Events
LLMRequestStartedEvent
Event emitted when an LLM request begins.
@dataclass
class LLMRequestStartedEvent(PluginBaseEvent):
type: str = "plugin.llm_request_started"
request_id: str
model: Optional[str] = None
streaming: bool = False
Fields
Unique identifier for this request. Auto-generated UUID if not provided.
Model being used for this request (e.g., “gpt-4”, “claude-3-opus”).
Whether this is a streaming request.
Example
from vision_agents.core.llm.events import LLMRequestStartedEvent
@manager.subscribe
async def track_llm_request(event: LLMRequestStartedEvent):
print(f"LLM request {event.request_id} started")
print(f"Model: {event.model}, Streaming: {event.streaming}")
LLMResponseChunkEvent
Event emitted when a streaming response chunk is received.
@dataclass
class LLMResponseChunkEvent(PluginBaseEvent):
type: str = "plugin.llm_response_chunk"
content_index: Optional[int] = None
delta: Optional[str] = None
item_id: Optional[str] = None
output_index: Optional[int] = None
sequence_number: Optional[int] = None
is_first_chunk: bool = False
time_to_first_token_ms: Optional[float] = None
Fields
The index of the content part that the text delta was added to.
The text delta that was added in this chunk.
The ID of the output item that the text delta was added to.
The index of the output item that the text delta was added to.
The sequence number for this event.
Whether this is the first chunk in the stream.
Time from request start to this first chunk (only set if is_first_chunk=True).
Example
from vision_agents.core.llm.events import LLMResponseChunkEvent
streaming_text = ""
@manager.subscribe
async def handle_chunk(event: LLMResponseChunkEvent):
global streaming_text
if event.is_first_chunk:
print(f"First token received in {event.time_to_first_token_ms}ms")
if event.delta:
streaming_text += event.delta
print(event.delta, end="", flush=True)
LLMResponseCompletedEvent
Event emitted after an LLM response is processed.
@dataclass
class LLMResponseCompletedEvent(PluginBaseEvent):
type: str = "plugin.llm_response_completed"
original: Any = None
text: str = ""
item_id: Optional[str] = None
latency_ms: Optional[float] = None
time_to_first_token_ms: Optional[float] = None
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
total_tokens: Optional[int] = None
model: Optional[str] = None
Fields
Original response object from the LLM provider.
The complete response text.
Identifier for this response item.
Total time from request to complete response in milliseconds.
Time from request to first token received (streaming) in milliseconds.
Number of input/prompt tokens consumed.
Number of output/completion tokens generated.
Total tokens (input + output). May differ from sum if cached.
Model identifier used for this response.
Example
from vision_agents.core.llm.events import LLMResponseCompletedEvent
@manager.subscribe
async def track_completion(event: LLMResponseCompletedEvent):
print(f"Response completed: {event.text[:100]}...")
print(f"Model: {event.model}")
print(f"Latency: {event.latency_ms}ms")
print(f"Tokens: {event.input_tokens} in / {event.output_tokens} out")
if event.time_to_first_token_ms:
print(f"Time to first token: {event.time_to_first_token_ms}ms")
LLMErrorEvent
Event emitted when a non-realtime LLM error occurs.
@dataclass
class LLMErrorEvent(PluginBaseEvent):
type: str = "plugin.llm_error"
error: Optional[Exception] = None
error_code: Optional[str] = None
context: Optional[str] = None
request_id: Optional[str] = None
is_recoverable: bool = True
Fields
The exception that occurred.
Error code from the LLM provider.
Additional context about where the error occurred.
Request ID associated with this error.
Whether the error is recoverable.
Properties
Human-readable error message extracted from the exception.
Example
from vision_agents.core.llm.events import LLMErrorEvent
@manager.subscribe
async def handle_llm_error(event: LLMErrorEvent):
print(f"LLM error: {event.error_message}")
print(f"Request ID: {event.request_id}")
print(f"Error code: {event.error_code}")
print(f"Recoverable: {event.is_recoverable}")
if not event.is_recoverable:
# Handle fatal error
print("Fatal error - stopping agent")
Event emitted when a tool execution starts.
@dataclass
class ToolStartEvent(PluginBaseEvent):
type: str = "plugin.llm.tool.start"
tool_name: str = ""
arguments: Optional[Dict[str, Any]] = None
tool_call_id: Optional[str] = None
Fields
Name of the tool being executed.
arguments
Dict[str, Any]
default:"None"
Arguments passed to the tool.
Unique identifier for this tool call.
Example
from vision_agents.core.llm.events import ToolStartEvent
@manager.subscribe
async def track_tool_start(event: ToolStartEvent):
print(f"Tool '{event.tool_name}' started")
print(f"Call ID: {event.tool_call_id}")
print(f"Arguments: {event.arguments}")
Event emitted when a tool execution ends.
@dataclass
class ToolEndEvent(PluginBaseEvent):
type: str = "plugin.llm.tool.end"
tool_name: str = ""
success: bool = True
result: Optional[Any] = None
error: Optional[str] = None
tool_call_id: Optional[str] = None
execution_time_ms: Optional[float] = None
Fields
Name of the tool that was executed.
Whether the tool execution was successful.
Result returned by the tool (if successful).
Error message (if failed).
Unique identifier for this tool call.
Time taken to execute the tool in milliseconds.
Example
from vision_agents.core.llm.events import ToolStartEvent, ToolEndEvent
import time
tool_timings = {}
@manager.subscribe
async def track_tool_lifecycle(event: ToolStartEvent | ToolEndEvent):
if isinstance(event, ToolStartEvent):
tool_timings[event.tool_call_id] = time.time()
print(f"🔧 Tool '{event.tool_name}' started")
elif isinstance(event, ToolEndEvent):
if event.success:
print(f"✅ Tool '{event.tool_name}' completed in {event.execution_time_ms}ms")
print(f"Result: {event.result}")
else:
print(f"❌ Tool '{event.tool_name}' failed: {event.error}")
Realtime Session Events
RealtimeConnectedEvent
Event emitted when realtime connection is established.
@dataclass
class RealtimeConnectedEvent(PluginBaseEvent):
type: str = "plugin.realtime_connected"
provider: Optional[str] = None
session_id: Optional[str] = None
session_config: Optional[dict[str, Any]] = None
capabilities: Optional[list[str]] = None
Fields
Provider name (e.g., “openai”, “anthropic”).
Unique session identifier.
Configuration for this session.
List of capabilities supported by this session.
RealtimeDisconnectedEvent
Event emitted when realtime connection is closed.
@dataclass
class RealtimeDisconnectedEvent(PluginBaseEvent):
type: str = "plugin.realtime_disconnected"
provider: Optional[str] = None
session_id: Optional[str] = None
reason: Optional[str] = None
was_clean: bool = True
Event emitted when audio input is sent to realtime session.
@dataclass
class RealtimeAudioInputEvent(PluginBaseEvent):
type: str = "plugin.realtime_audio_input"
data: Optional[PcmData] = None
RealtimeAudioOutputEvent
Event emitted when audio output is received from realtime session.
@dataclass
class RealtimeAudioOutputEvent(PluginBaseEvent):
type: str = "plugin.realtime_audio_output"
data: Optional[PcmData] = None
response_id: Optional[str] = None
RealtimeResponseEvent
Event emitted when realtime session provides a response.
@dataclass
class RealtimeResponseEvent(PluginBaseEvent):
type: str = "plugin.realtime_response"
original: Optional[str] = None
text: Optional[str] = None
response_id: str
is_complete: bool = True
conversation_item_id: Optional[str] = None
RealtimeErrorEvent
Event emitted when a realtime error occurs.
@dataclass
class RealtimeErrorEvent(PluginBaseEvent):
type: str = "plugin.realtime_error"
error: Optional[Exception] = None
error_code: Optional[str] = None
context: Optional[str] = None
is_recoverable: bool = True
RealtimeUserSpeechTranscriptionEvent
Event emitted when user speech transcription is available from realtime session.
@dataclass
class RealtimeUserSpeechTranscriptionEvent(PluginBaseEvent):
type: str = "plugin.realtime_user_speech_transcription"
text: str = ""
original: Optional[Any] = None
RealtimeAgentSpeechTranscriptionEvent
Event emitted when agent speech transcription is available from realtime session.
@dataclass
class RealtimeAgentSpeechTranscriptionEvent(PluginBaseEvent):
type: str = "plugin.realtime_agent_speech_transcription"
text: str = ""
original: Optional[Any] = None
Vision-Language Model (VLM) Events
VLMInferenceStartEvent
Event emitted when a VLM inference starts.
@dataclass
class VLMInferenceStartEvent(PluginBaseEvent):
type: str = "plugin.vlm_inference_start"
inference_id: str
model: Optional[str] = None
frames_count: int = 0
Fields
Unique identifier for this inference. Auto-generated UUID if not provided.
Model being used for this inference.
Number of video frames to process.
Example
from vision_agents.core.llm.events import VLMInferenceStartEvent
@manager.subscribe
async def track_vlm_start(event: VLMInferenceStartEvent):
print(f"VLM inference started: {event.inference_id}")
print(f"Model: {event.model}, Frames: {event.frames_count}")
VLMInferenceCompletedEvent
Event emitted when a VLM inference completes.
@dataclass
class VLMInferenceCompletedEvent(PluginBaseEvent):
type: str = "plugin.vlm_inference_completed"
inference_id: Optional[str] = None
model: Optional[str] = None
text: str = ""
latency_ms: Optional[float] = None
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
frames_processed: int = 0
detections: int = 0
Fields
Unique identifier for this inference.
Model used for this inference.
Text output from the model.
Total time from request to complete response in milliseconds.
Number of input tokens (text + image tokens).
Number of output tokens generated.
Number of video frames processed in this inference.
Number of objects/items detected (for detection models).
Example
from vision_agents.core.llm.events import (
VLMInferenceStartEvent,
VLMInferenceCompletedEvent
)
@manager.subscribe
async def track_vlm_inference(event: VLMInferenceStartEvent | VLMInferenceCompletedEvent):
if isinstance(event, VLMInferenceStartEvent):
print(f"🎬 VLM inference {event.inference_id} started")
print(f"Processing {event.frames_count} frames")
else:
print(f"✅ VLM inference completed in {event.latency_ms}ms")
print(f"Result: {event.text}")
print(f"Frames processed: {event.frames_processed}")
print(f"Detections: {event.detections}")
VLMErrorEvent
Event emitted when a VLM error occurs.
@dataclass
class VLMErrorEvent(PluginBaseEvent):
type: str = "plugin.vlm_error"
error: Optional[Exception] = None
error_code: Optional[str] = None
context: Optional[str] = None
inference_id: Optional[str] = None
is_recoverable: bool = True
Complete Example
from vision_agents.core.events.manager import EventManager
from vision_agents.core.llm.events import (
LLMRequestStartedEvent,
LLMResponseChunkEvent,
LLMResponseCompletedEvent,
LLMErrorEvent,
ToolStartEvent,
ToolEndEvent,
VLMInferenceStartEvent,
VLMInferenceCompletedEvent
)
import time
# Create event manager
manager = EventManager()
# Register events
manager.register(
LLMRequestStartedEvent,
LLMResponseChunkEvent,
LLMResponseCompletedEvent,
LLMErrorEvent,
ToolStartEvent,
ToolEndEvent,
VLMInferenceStartEvent,
VLMInferenceCompletedEvent
)
# Track streaming responses
streaming_responses = {}
@manager.subscribe
async def handle_llm_request(event: LLMRequestStartedEvent):
print(f"🚀 LLM request started: {event.request_id}")
print(f"Model: {event.model}, Streaming: {event.streaming}")
streaming_responses[event.request_id] = ""
@manager.subscribe
async def handle_response_chunk(event: LLMResponseChunkEvent):
if event.is_first_chunk:
print(f"⚡ First token in {event.time_to_first_token_ms}ms")
if event.delta:
print(event.delta, end="", flush=True)
@manager.subscribe
async def handle_response_completed(event: LLMResponseCompletedEvent):
print(f"\n✅ Response completed")
print(f"Latency: {event.latency_ms}ms")
print(f"Tokens: {event.input_tokens} in, {event.output_tokens} out")
print(f"Model: {event.model}")
@manager.subscribe
async def handle_llm_error(event: LLMErrorEvent):
print(f"❌ LLM Error: {event.error_message}")
if not event.is_recoverable:
print("Fatal error - cannot continue")
# Track tool executions
@manager.subscribe
async def handle_tool_execution(event: ToolStartEvent | ToolEndEvent):
if isinstance(event, ToolStartEvent):
print(f"🔧 Tool '{event.tool_name}' started")
print(f"Arguments: {event.arguments}")
else:
if event.success:
print(f"✅ Tool '{event.tool_name}' completed in {event.execution_time_ms}ms")
else:
print(f"❌ Tool '{event.tool_name}' failed: {event.error}")
# Track VLM inferences
@manager.subscribe
async def handle_vlm_inference(event: VLMInferenceStartEvent | VLMInferenceCompletedEvent):
if isinstance(event, VLMInferenceStartEvent):
print(f"🎬 VLM inference started: {event.frames_count} frames")
else:
print(f"✅ VLM completed: {event.detections} detections in {event.latency_ms}ms")
# Send events
manager.send(LLMRequestStartedEvent(
plugin_name="openai",
request_id="req-123",
model="gpt-4",
streaming=True
))
await manager.wait()
Location
vision_agents/core/llm/events.py:1-276