Skip to main content

Overview

The STT (Speech-to-Text) class is an abstract base class for implementing speech recognition in Vision Agents. It provides a standardized interface for transcribing audio streams to text with support for partial transcripts, turn detection, and error handling. Location: vision_agents.core.stt.stt.STT

Usage

from vision_agents.core.stt.stt import STT
from getstream.video.rtc.track_util import PcmData
from vision_agents.core.edge.types import Participant
from vision_agents.core.stt.events import TranscriptResponse

class MySTT(STT):
    async def process_audio(
        self,
        pcm_data: PcmData,
        participant: Participant,
    ):
        # Send audio to STT service
        result = await self.transcribe(pcm_data)
        
        # Emit partial transcript
        if result.is_partial:
            self._emit_partial_transcript_event(
                text=result.text,
                participant=participant,
                response=TranscriptResponse(
                    confidence=result.confidence,
                    language="en",
                    model_name=self.model
                )
            )
        # Emit final transcript
        else:
            self._emit_transcript_event(
                text=result.text,
                participant=participant,
                response=TranscriptResponse(
                    confidence=result.confidence,
                    language="en",
                    model_name=self.model
                )
            )

Constructor

def __init__(
    self,
    provider_name: Optional[str] = None,
):
provider_name
Optional[str]
Name of the STT provider (e.g., “elevenlabs”, “deepgram”, “whisper”). If not provided, uses the class name.

Abstract Methods

process_audio
async method
required
Process incoming audio data for transcription.This method is called approximately every 20ms with new audio data. Implementations should:
  1. Buffer or send audio to the STT service
  2. Emit partial transcripts as they become available
  3. Emit final transcripts when speech segments complete
  4. Emit turn events if turn detection is supported
Parameters:
  • pcm_data (PcmData): PCM audio data to process
  • participant (Participant): Participant who is speaking
Example:
async def process_audio(self, pcm_data: PcmData, participant: Participant):
    # Resample to required format
    resampled = pcm_data.resample(16000, 1)
    
    # Send to STT service
    await self.connection.send(resampled.samples.tobytes())

Lifecycle Methods

start
async method
Initialize the STT service and prepare for audio processing.Override this method to:
  • Establish connections to STT APIs
  • Start background tasks
  • Initialize audio buffers
Note: Base implementation sets self.started = True and prevents double-start.Example:
async def start(self):
    await super().start()
    self.ws = await websocket.connect(self.url)
    self._listen_task = asyncio.create_task(self._listen_loop())
clear
async method
Clear any pending audio or internal state.Called when:
  • User stops speaking (turn ends)
  • Agent needs to interrupt
  • Conversation needs to be reset
Override to flush buffers and reset state.Example:
async def clear(self):
    if self.audio_buffer:
        self.audio_buffer.clear()
    if self.connection:
        await self.connection.flush()
close
async method
Close the STT service and release resources.Base implementation sets self.closed = True. Override to:
  • Close WebSocket connections
  • Cancel background tasks
  • Clean up buffers
Example:
async def close(self):
    await super().close()
    if self._listen_task:
        self._listen_task.cancel()
    if self.ws:
        await self.ws.close()

Event Emission Methods

STT implementations must call these methods to emit events:
_emit_transcript_event
method
Emit a final transcript event.Parameters:
  • text (str): The transcribed text
  • participant (Participant): Participant metadata
  • response (TranscriptResponse): Transcription response metadata
Example:
self._emit_transcript_event(
    text="Hello, how are you?",
    participant=participant,
    response=TranscriptResponse(
        confidence=0.95,
        language="en",
        model_name="scribe_v2",
        processing_time_ms=150.0
    )
)
_emit_partial_transcript_event
method
Emit a partial (interim) transcript event.Parameters:
  • text (str): The partial transcribed text
  • participant (Participant): Participant metadata
  • response (TranscriptResponse): Transcription response metadata
Partial transcripts are useful for:
  • Real-time UI updates
  • Early turn detection
  • Responsive user feedback
_emit_error_event
method
Emit an error event for temporary errors.Parameters:
  • error (Exception): The error that occurred
  • participant (Optional[Participant]): Participant metadata
  • context (str): Error context description
Note: Only emit for recoverable/temporary errors. Permanent errors (invalid config, auth failures) should be raised directly.Example:
try:
    await self.connection.send(audio)
except ConnectionError as e:
    self._emit_error_event(
        error=e,
        participant=participant,
        context="Failed to send audio to STT service"
    )

Turn Detection Methods

If your STT service supports turn detection, set turn_detection = True and emit these events:
_emit_turn_started_event
method
Emit an event when a user starts speaking.Parameters:
  • participant (Participant): Participant who started speaking
  • confidence (Optional[float]): Confidence of turn detection (0.0-1.0). Default: 0.5
Example:
self._emit_turn_started_event(
    participant=participant,
    confidence=0.9
)
_emit_turn_ended_event
method
Emit an event when a user stops speaking.Parameters:
  • participant (Participant): Participant who stopped speaking
  • eager_end_of_turn (bool): Whether this is an early/eager turn end. Default: False
  • confidence (Optional[float]): Confidence of turn detection (0.0-1.0). Default: 0.5
Example:
self._emit_turn_ended_event(
    participant=participant,
    eager_end_of_turn=False,
    confidence=0.85
)

Properties

closed
bool
Whether the STT service has been closed.
started
bool
Whether the STT service has been started.
turn_detection
bool
Whether this STT implementation supports turn detection. Set to True in subclasses that support it.
session_id
str
Unique session identifier (UUID). Automatically generated.
provider_name
str
Name of the STT provider.
events
EventManager
Event manager for emitting STT events.

Plugin Example

Here’s how the ElevenLabs plugin implements the STT interface:
from vision_agents.core.stt.stt import STT
from elevenlabs import AsyncElevenLabs, RealtimeConnection

class STT(STT):
    turn_detection: bool = False  # Scribe v2 doesn't support turn detection
    
    def __init__(
        self,
        api_key: Optional[str] = None,
        model_id: str = "scribe_v2_realtime",
        language_code: str = "en",
    ):
        super().__init__(provider_name="elevenlabs")
        self.client = AsyncElevenLabs(api_key=api_key)
        self.model_id = model_id
        self.language_code = language_code
        self.connection: Optional[RealtimeConnection] = None
        self._audio_queue = AudioQueue(buffer_limit_ms=10000)
    
    async def start(self):
        await super().start()
        
        # Connect to ElevenLabs Scribe API
        audio_options = {
            "model_id": self.model_id,
            "language_code": self.language_code,
            "audio_format": AudioFormat.PCM_16000,
            "sample_rate": 16000,
        }
        
        self.connection = await self.client.speech_to_text.realtime.connect(
            audio_options
        )
        
        # Register event handlers
        self.connection.on(
            RealtimeEvents.PARTIAL_TRANSCRIPT,
            self._on_partial_transcript
        )
        self.connection.on(
            RealtimeEvents.COMMITTED_TRANSCRIPT,
            self._on_committed_transcript
        )
    
    async def process_audio(
        self,
        pcm_data: PcmData,
        participant: Participant,
    ):
        # Resample to 16kHz mono (required by ElevenLabs)
        resampled = pcm_data.resample(16000, 1)
        
        # Add to queue for batching
        await self._audio_queue.put(resampled)
        
        # Send batched audio to ElevenLabs
        chunk = await self._audio_queue.get_duration(100)  # 100ms chunks
        await self.connection.send({
            "audio_base_64": base64.b64encode(chunk.samples.tobytes()),
            "sample_rate": 16000,
        })
    
    def _on_partial_transcript(self, data: dict):
        self._emit_partial_transcript_event(
            text=data["text"],
            participant=self._current_participant,
            response=TranscriptResponse(
                confidence=data["confidence"],
                language=self.language_code,
                model_name=self.model_id,
            )
        )
    
    def _on_committed_transcript(self, data: dict):
        self._emit_transcript_event(
            text=data["text"],
            participant=self._current_participant,
            response=TranscriptResponse(
                confidence=data["confidence"],
                language=self.language_code,
                model_name=self.model_id,
            )
        )
    
    async def close(self):
        await super().close()
        if self.connection:
            await self.connection.close()

Events

STT implementations emit the following events:
  • STTTranscriptEvent - Final transcript
  • STTPartialTranscriptEvent - Partial/interim transcript
  • STTErrorEvent - Errors during processing
  • TurnStartedEvent - User started speaking (if turn detection supported)
  • TurnEndedEvent - User stopped speaking (if turn detection supported)
See Events for more details.

Build docs developers (and LLMs) love