Skip to main content

Overview

The Realtime class is an abstract base class for LLMs that can receive and process both audio and video in real-time. It extends OmniLLM to provide a full multimodal interface with event-driven architecture. Location: vision_agents.core.llm.realtime.Realtime

Usage

from vision_agents.core.llm.realtime import Realtime
from getstream.video.rtc.track_util import PcmData
from vision_agents.core.edge.types import Participant

class MyRealtimeLLM(Realtime):
    async def connect(self):
        # Establish connection to the real-time API
        self._emit_connected_event(
            session_config={"model": "my-model"},
            capabilities={"audio": True, "video": True}
        )
    
    async def simple_audio_response(
        self,
        pcm: PcmData,
        participant: Optional[Participant] = None
    ):
        # Forward audio to the API
        self._emit_audio_input_event(pcm, participant)
        # Process audio and emit response
        pass
    
    async def close(self):
        # Clean up connection
        self._emit_disconnected_event()

Constructor

def __init__(
    self,
    fps: int = 1,  # Video frames per second to send
):
fps
int
default:"1"
The number of video frames per second to send to the model (for implementations that support setting fps).

Abstract Methods

connect
async method
required
Establish connection to the real-time API.Implementations should:
  1. Connect to the provider’s WebSocket or streaming API
  2. Call _emit_connected_event() when ready
  3. Set up message handlers
Example:
async def connect(self):
    self.ws = await websocket.connect(url)
    self._emit_connected_event(
        session_config={"model": self.model},
        capabilities={"audio": True, "video": True}
    )
simple_audio_response
async method
required
Process incoming audio and generate a response.Parameters:
  • pcm (PcmData): PCM audio data to process
  • participant (Optional[Participant]): Participant who sent the audio
Implementations should:
  1. Call _emit_audio_input_event() when receiving audio
  2. Forward audio to the provider’s API
  3. Call _emit_audio_output_event() when generating response audio
  4. Call _emit_audio_output_done_event() when complete
close
async method
required
Close the connection and clean up resources.Implementations should:
  1. Close WebSocket/streaming connections
  2. Cancel background tasks
  3. Call _emit_disconnected_event()

Properties

connected
bool
Whether the connection is currently active.
session_id
str
UUID identifying this session. Automatically generated.
provider_name
str
Name of the provider (e.g., “gemini_realtime”, “openai_realtime”).
fps
int
Video frames per second being sent to the model.

Event Emission Methods

The Realtime class provides helper methods for emitting structured events:

Connection Events

_emit_connected_event
method
Emit a connected event when the session starts.Parameters:
  • session_config (Optional[dict]): Session configuration details
  • capabilities (Optional[dict]): API capabilities (audio, video, etc.)
Example:
self._emit_connected_event(
    session_config={"model": "gemini-2.5-flash"},
    capabilities={"audio": True, "video": True, "tools": True}
)
_emit_disconnected_event
method
Emit a disconnected event when the session ends.Parameters:
  • reason (Optional[str]): Reason for disconnection
  • was_clean (bool): Whether the disconnection was clean. Default: True
Example:
self._emit_disconnected_event(
    reason="Session timeout",
    was_clean=False
)

Audio Events

_emit_audio_input_event
method
Emit an event when audio input is received.Parameters:
  • audio_data (PcmData): The audio data
  • user_metadata (Optional[dict]): User metadata
_emit_audio_output_event
method
Emit an event when audio output is generated.Parameters:
  • audio_data (PcmData): The audio data
  • response_id (Optional[str]): Response identifier
  • user_metadata (Optional[dict]): User metadata
_emit_audio_output_done_event
method
Emit an event when audio output is complete.Parameters:
  • response_id (Optional[str]): Response identifier
  • user_metadata (Optional[dict]): User metadata

Response Events

_emit_response_event
method
Emit a text response event.Parameters:
  • text (str): The response text
  • response_id (Optional[str]): Response identifier
  • is_complete (bool): Whether the response is complete. Default: True
  • conversation_item_id (Optional[str]): Conversation item ID
  • user_metadata (Optional[dict]): User metadata

Transcription Events

_emit_user_speech_transcription
method
Emit a user speech transcription event.Parameters:
  • text (str): Transcribed text
  • original (Optional[Any]): Original provider response
_emit_agent_speech_transcription
method
Emit an agent speech transcription event.Parameters:
  • text (str): Transcribed text
  • original (Optional[Any]): Original provider response

Error Events

_emit_error_event
method
Emit an error event.Parameters:
  • error (Exception): The error that occurred
  • context (str): Error context. Default: ""
  • user_metadata (Optional[dict]): User metadata

Plugin Example

Here’s how the Gemini Realtime plugin implements this interface:
from vision_agents.core.llm.realtime import Realtime
from google.genai.live import AsyncSession

class GeminiRealtime(Realtime):
    def __init__(
        self,
        api_key: str,
        model: str = "gemini-2.5-flash-native-audio-preview",
        fps: int = 1,
    ):
        super().__init__(fps=fps)
        self.api_key = api_key
        self.model = model
        self.session: Optional[AsyncSession] = None
    
    async def connect(self):
        # Create Gemini Live API session
        client = genai.Client(api_key=self.api_key)
        config = LiveConnectConfigDict(
            response_modalities=[Modality.AUDIO],
            input_audio_transcription=AudioTranscriptionConfigDict(),
        )
        self.session = client.aio.live.connect(
            model=self.model,
            config=config
        )
        
        # Start listening for server messages
        asyncio.create_task(self._listen_loop())
        
        # Emit connected event
        self._emit_connected_event(
            session_config={"model": self.model},
            capabilities={"audio": True, "video": True}
        )
    
    async def simple_audio_response(
        self,
        pcm: PcmData,
        participant: Optional[Participant] = None
    ):
        if not self.session:
            raise RuntimeError("Not connected")
        
        # Emit input event
        self._emit_audio_input_event(pcm, participant)
        
        # Send to Gemini
        await self.session.send(
            {"data": pcm.samples.tobytes(), "mime_type": "audio/pcm"}
        )
    
    async def _listen_loop(self):
        async for message in self.session.receive():
            if hasattr(message, 'audio'):
                # Emit output audio
                pcm = PcmData.from_bytes(
                    message.audio,
                    sample_rate=24000,
                    channels=1
                )
                self._emit_audio_output_event(pcm)
    
    async def close(self):
        if self.session:
            await self.session.close()
        self._emit_disconnected_event()

Events

Realtime implementations emit the following events:
  • RealtimeConnectedEvent - When session connects
  • RealtimeDisconnectedEvent - When session disconnects
  • RealtimeAudioInputEvent - When audio input is received
  • RealtimeAudioOutputEvent - When audio output is generated
  • RealtimeAudioOutputDoneEvent - When audio output completes
  • RealtimeResponseEvent - Text responses
  • RealtimeUserSpeechTranscriptionEvent - User speech transcriptions
  • RealtimeAgentSpeechTranscriptionEvent - Agent speech transcriptions
  • RealtimeErrorEvent - Errors during processing
See Events for more details.

Build docs developers (and LLMs) love