Overview
The Agent class is the foundation for building video AI applications. It orchestrates audio/video streams, LLM interactions, speech recognition, text-to-speech, and turn detection.
from vision_agents.core import Agent
from vision_agents.edge import Edge
from vision_agents.llm.gemini import Realtime
# Create a realtime agent
agent = Agent(
edge=Edge(),
agent_user=agent_user,
instructions="Read @voice-agent.md",
llm=Realtime(),
processors=[],
)
# Join a call
async with agent.join(call):
await agent.finish()
Don’t reuse the agent object. Create a new agent instance for each call.
Constructor
Agent(
edge: EdgeTransport,
llm: LLM | AudioLLM | VideoLLM,
agent_user: User,
instructions: str = "Keep your replies short and dont use special characters.",
stt: Optional[STT] = None,
tts: Optional[TTS] = None,
turn_detection: Optional[TurnDetector] = None,
processors: Optional[list[Processor]] = None,
mcp_servers: Optional[list[MCPBaseServer]] = None,
options: Optional[AgentOptions] = None,
tracer: Tracer = trace.get_tracer("agents"),
profiler: Optional[Profiler] = None,
streaming_tts: bool = False,
broadcast_metrics: bool = False,
broadcast_metrics_interval: float = 5.0,
multi_speaker_filter: Optional[AudioFilter] = None,
)
Parameters
Edge transport for video and audio connectivity.
llm
LLM | AudioLLM | VideoLLM
required
LLM, optionally with audio/video/realtime capabilities.
The agent’s user identity.
System instructions for the LLM. Supports @file.md references.
stt
Optional[STT]
default:"None"
Speech-to-text service. Not needed when using a realtime LLM.
tts
Optional[TTS]
default:"None"
Text-to-speech service. Not needed when using a realtime LLM.
turn_detection
Optional[TurnDetector]
default:"None"
Turn detector for managing conversational turns. Not needed when using a realtime LLM.
processors
Optional[list[Processor]]
default:"None"
Processors that run alongside the agent (e.g. video analysis, data fetching). Their state is passed to the LLM.
mcp_servers
Optional[list[MCPBaseServer]]
default:"None"
MCP servers for external tool and resource access.
options
Optional[AgentOptions]
default:"None"
Agent configuration options. Merged with defaults when provided.
tracer
Tracer
default:"trace.get_tracer('agents')"
OpenTelemetry tracer for distributed tracing.
profiler
Optional[Profiler]
default:"None"
Optional profiler for performance monitoring.
Send text to TTS as sentences stream from the LLM rather than waiting for the complete response. Reduces perceived latency for non-realtime LLMs that emit LLMResponseChunkEvent.
Whether to periodically broadcast agent metrics to call participants as custom events.
broadcast_metrics_interval
Interval in seconds between metric broadcasts.
multi_speaker_filter
Optional[AudioFilter]
default:"None"
Audio filter for handling overlapping speech from multiple participants. Takes effect only when more than one participant is present. Defaults to FirstSpeakerWinsFilter, which uses VAD to lock onto the first participant who starts speaking and drops audio from everyone else until the active speaker’s turn ends, or they go silent.
Properties
@property
def id(self) -> str
Unique identifier for the agent instance.
closed
@property
def closed(self) -> bool
Returns True if the agent has been closed.
Methods
join
@asynccontextmanager
async def join(
self,
call: Call,
participant_wait_timeout: Optional[float] = 10.0
) -> AsyncIterator[None]
Join the given call. The agent can join a call only once. Once the call ends, the agent closes itself.
participant_wait_timeout
Optional[float]
default:"10.0"
Timeout in seconds to wait for other participants to join before proceeding. If 0, do not wait at all. If None, wait forever.
Example:
async with agent.join(call):
await agent.finish()
wait_for_participant
async def wait_for_participant(
self,
timeout: Optional[float] = None
) -> None
Wait for a participant other than the AI agent to join.
timeout
Optional[float]
default:"None"
How long to wait for the participant to join in seconds. If None, wait forever.
finish
async def finish(self) -> None
Wait for the call to end gracefully. If no connection is active, returns immediately.
close
async def close(self) -> None
Clean up all connections and resources. Closes MCP connections, realtime output, active media tracks, processor tasks, the call connection, STT/TTS services, and stops turn detection. It is safe to call multiple times.
simple_response
async def simple_response(
self,
text: str,
participant: Optional[Participant] = None
) -> None
Send a text prompt to the LLM for a response.
The text to send to the LLM.
participant
Optional[Participant]
default:"None"
The participant context for this request.
simple_audio_response
async def simple_audio_response(
self,
pcm: PcmData,
participant: Optional[Participant] = None
) -> None
Send audio directly to the LLM for processing (only works with AudioLLM).
PCM audio data to process.
participant
Optional[Participant]
default:"None"
The participant context for this request.
say
async def say(
self,
text: str,
user_id: Optional[str] = None,
metadata: Optional[dict[str, Any]] = None
) -> None
Make the agent say something using TTS.
The text for the agent to say.
user_id
Optional[str]
default:"None"
Optional user ID for the speech.
metadata
Optional[dict[str, Any]]
default:"None"
Optional metadata to include with the speech.
Example:
await agent.say("Hello! How can I help you today?")
subscribe
def subscribe(self, function) -> Disposable
Subscribe a callback to the agent-wide event bus. The event bus is a merged stream of events from the edge, LLM, STT, TTS, VAD, and other registered plugins.
Async or sync callable that accepts a single event object.
A disposable subscription handle.
Example:
@agent.subscribe
async def on_event(event):
if isinstance(event, STTTranscriptEvent):
print(f"User said: {event.text}")
authenticate
async def authenticate(self) -> None
Authenticate the agent user with the edge provider. Idempotent — safe to call multiple times.
create_call
async def create_call(
self,
call_type: str,
call_id: str
) -> Call
Create a call in the edge provider. Automatically authenticates if not already done.
The type of call to create.
Unique identifier for the call.
idle_for
def idle_for(self) -> float
Return the idle time for this connection if there are no other participants except the agent itself. 0.0 means the connection is active.
Idle time in seconds, or 0.0 if active.
on_call_for
def on_call_for(self) -> float
Return the number of seconds for how long the agent has been on the call. Returns 0.0 if the agent has not joined a call yet.
Duration in seconds since the agent joined the call, or 0.0 if not on a call.
span
@contextmanager
def span(self, name: str) -> Iterator[Span]
Create an OpenTelemetry span for tracing.
Context manager yielding the span.
Event Handling
The Agent class handles several types of events:
- STT Flow: AudioReceivedEvent → STTTranscriptEvent → TurnCompleted → LLMResponseCompletedEvent → TTSAudioEvent
- Realtime Flow: Direct transcriptions from realtime LLM
- Streaming TTS: LLMResponseChunkEvent chunks are accumulated and sent to TTS at sentence boundaries
- Video Track Events: Track added/removed events for video processing
- Error Events: STTErrorEvent and other error handling