Skip to main content

Overview

The Agent class is the foundation for building video AI applications. It orchestrates audio/video streams, LLM interactions, speech recognition, text-to-speech, and turn detection.
from vision_agents.core import Agent
from vision_agents.edge import Edge
from vision_agents.llm.gemini import Realtime

# Create a realtime agent
agent = Agent(
    edge=Edge(),
    agent_user=agent_user,
    instructions="Read @voice-agent.md",
    llm=Realtime(),
    processors=[],
)

# Join a call
async with agent.join(call):
    await agent.finish()
Don’t reuse the agent object. Create a new agent instance for each call.

Constructor

Agent(
    edge: EdgeTransport,
    llm: LLM | AudioLLM | VideoLLM,
    agent_user: User,
    instructions: str = "Keep your replies short and dont use special characters.",
    stt: Optional[STT] = None,
    tts: Optional[TTS] = None,
    turn_detection: Optional[TurnDetector] = None,
    processors: Optional[list[Processor]] = None,
    mcp_servers: Optional[list[MCPBaseServer]] = None,
    options: Optional[AgentOptions] = None,
    tracer: Tracer = trace.get_tracer("agents"),
    profiler: Optional[Profiler] = None,
    streaming_tts: bool = False,
    broadcast_metrics: bool = False,
    broadcast_metrics_interval: float = 5.0,
    multi_speaker_filter: Optional[AudioFilter] = None,
)

Parameters

edge
EdgeTransport
required
Edge transport for video and audio connectivity.
llm
LLM | AudioLLM | VideoLLM
required
LLM, optionally with audio/video/realtime capabilities.
agent_user
User
required
The agent’s user identity.
instructions
str
System instructions for the LLM. Supports @file.md references.
stt
Optional[STT]
default:"None"
Speech-to-text service. Not needed when using a realtime LLM.
tts
Optional[TTS]
default:"None"
Text-to-speech service. Not needed when using a realtime LLM.
turn_detection
Optional[TurnDetector]
default:"None"
Turn detector for managing conversational turns. Not needed when using a realtime LLM.
processors
Optional[list[Processor]]
default:"None"
Processors that run alongside the agent (e.g. video analysis, data fetching). Their state is passed to the LLM.
mcp_servers
Optional[list[MCPBaseServer]]
default:"None"
MCP servers for external tool and resource access.
options
Optional[AgentOptions]
default:"None"
Agent configuration options. Merged with defaults when provided.
tracer
Tracer
default:"trace.get_tracer('agents')"
OpenTelemetry tracer for distributed tracing.
profiler
Optional[Profiler]
default:"None"
Optional profiler for performance monitoring.
streaming_tts
bool
default:"False"
Send text to TTS as sentences stream from the LLM rather than waiting for the complete response. Reduces perceived latency for non-realtime LLMs that emit LLMResponseChunkEvent.
broadcast_metrics
bool
default:"False"
Whether to periodically broadcast agent metrics to call participants as custom events.
broadcast_metrics_interval
float
default:"5.0"
Interval in seconds between metric broadcasts.
multi_speaker_filter
Optional[AudioFilter]
default:"None"
Audio filter for handling overlapping speech from multiple participants. Takes effect only when more than one participant is present. Defaults to FirstSpeakerWinsFilter, which uses VAD to lock onto the first participant who starts speaking and drops audio from everyone else until the active speaker’s turn ends, or they go silent.

Properties

id

@property
def id(self) -> str
id
str
Unique identifier for the agent instance.

closed

@property
def closed(self) -> bool
closed
bool
Returns True if the agent has been closed.

Methods

join

@asynccontextmanager
async def join(
    self,
    call: Call,
    participant_wait_timeout: Optional[float] = 10.0
) -> AsyncIterator[None]
Join the given call. The agent can join a call only once. Once the call ends, the agent closes itself.
call
Call
required
The call to join.
participant_wait_timeout
Optional[float]
default:"10.0"
Timeout in seconds to wait for other participants to join before proceeding. If 0, do not wait at all. If None, wait forever.
Example:
async with agent.join(call):
    await agent.finish()

wait_for_participant

async def wait_for_participant(
    self,
    timeout: Optional[float] = None
) -> None
Wait for a participant other than the AI agent to join.
timeout
Optional[float]
default:"None"
How long to wait for the participant to join in seconds. If None, wait forever.

finish

async def finish(self) -> None
Wait for the call to end gracefully. If no connection is active, returns immediately.

close

async def close(self) -> None
Clean up all connections and resources. Closes MCP connections, realtime output, active media tracks, processor tasks, the call connection, STT/TTS services, and stops turn detection. It is safe to call multiple times.

simple_response

async def simple_response(
    self,
    text: str,
    participant: Optional[Participant] = None
) -> None
Send a text prompt to the LLM for a response.
text
str
required
The text to send to the LLM.
participant
Optional[Participant]
default:"None"
The participant context for this request.

simple_audio_response

async def simple_audio_response(
    self,
    pcm: PcmData,
    participant: Optional[Participant] = None
) -> None
Send audio directly to the LLM for processing (only works with AudioLLM).
pcm
PcmData
required
PCM audio data to process.
participant
Optional[Participant]
default:"None"
The participant context for this request.

say

async def say(
    self,
    text: str,
    user_id: Optional[str] = None,
    metadata: Optional[dict[str, Any]] = None
) -> None
Make the agent say something using TTS.
text
str
required
The text for the agent to say.
user_id
Optional[str]
default:"None"
Optional user ID for the speech.
metadata
Optional[dict[str, Any]]
default:"None"
Optional metadata to include with the speech.
Example:
await agent.say("Hello! How can I help you today?")

subscribe

def subscribe(self, function) -> Disposable
Subscribe a callback to the agent-wide event bus. The event bus is a merged stream of events from the edge, LLM, STT, TTS, VAD, and other registered plugins.
function
Callable
required
Async or sync callable that accepts a single event object.
return
Disposable
A disposable subscription handle.
Example:
@agent.subscribe
async def on_event(event):
    if isinstance(event, STTTranscriptEvent):
        print(f"User said: {event.text}")

authenticate

async def authenticate(self) -> None
Authenticate the agent user with the edge provider. Idempotent — safe to call multiple times.

create_call

async def create_call(
    self,
    call_type: str,
    call_id: str
) -> Call
Create a call in the edge provider. Automatically authenticates if not already done.
call_type
str
required
The type of call to create.
call_id
str
required
Unique identifier for the call.
return
Call
The created call object.

idle_for

def idle_for(self) -> float
Return the idle time for this connection if there are no other participants except the agent itself. 0.0 means the connection is active.
return
float
Idle time in seconds, or 0.0 if active.

on_call_for

def on_call_for(self) -> float
Return the number of seconds for how long the agent has been on the call. Returns 0.0 if the agent has not joined a call yet.
return
float
Duration in seconds since the agent joined the call, or 0.0 if not on a call.

span

@contextmanager
def span(self, name: str) -> Iterator[Span]
Create an OpenTelemetry span for tracing.
name
str
required
Name of the span.
return
Iterator[Span]
Context manager yielding the span.

Event Handling

The Agent class handles several types of events:
  • STT Flow: AudioReceivedEvent → STTTranscriptEvent → TurnCompleted → LLMResponseCompletedEvent → TTSAudioEvent
  • Realtime Flow: Direct transcriptions from realtime LLM
  • Streaming TTS: LLMResponseChunkEvent chunks are accumulated and sent to TTS at sentence boundaries
  • Video Track Events: Track added/removed events for video processing
  • Error Events: STTErrorEvent and other error handling

Build docs developers (and LLMs) love