Skip to main content
Audio processor classes handle incoming audio streams, process audio data, and optionally publish processed audio back to the call. Use these for audio analysis, transcription, voice activity detection, or audio transformation.

AudioProcessor

Processes incoming audio data without publishing audio back.
from vision_agents.core.processors.base_processor import AudioProcessor

class AudioProcessor(Processor, metaclass=abc.ABCMeta):
    """
    Base class for all audio processors that process incoming audio tracks.
    """

Methods

process_audio

async def process_audio(self, audio_data: PcmData) -> None:
    """Process audio data. Override this method to implement audio processing."""
Called for each chunk of incoming audio data. Process the audio samples, extract features, or perform analysis.
audio_data
PcmData
required
PCM audio data containing samples and metadata
PcmData Structure:
  • audio_data.samples - numpy array of audio samples
  • audio_data.sample_rate - Sample rate in Hz (typically 16000 or 48000)
  • audio_data.channels - Number of audio channels (1 for mono, 2 for stereo)
  • audio_data.participant - Participant information (ID, name, etc.)
Example:
async def process_audio(self, audio_data: PcmData) -> None:
    # Access audio samples
    samples = audio_data.samples  # numpy array
    sample_rate = audio_data.sample_rate
    channels = audio_data.channels
    participant = audio_data.participant
    
    logger.info(
        f"Processing {len(samples)} samples at {sample_rate}Hz "
        f"from {participant.id if participant else 'unknown'}"
    )
    
    # Your audio processing logic
    volume = np.abs(samples).mean()
    if volume > 0.1:
        logger.info(f"High volume detected: {volume:.2f}")

AudioPublisher

Publishes outgoing audio tracks without processing incoming audio.
from vision_agents.core.processors.base_processor import AudioPublisher

class AudioPublisher(Processor, metaclass=abc.ABCMeta):
    """
    Base class for audio processors that publish outgoing audio tracks.
    """

Methods

publish_audio_track

def publish_audio_track(self) -> aiortc.AudioStreamTrack:
    """Returns an audio track to publish to the call."""
Return the audio track to publish. Called by the agent when setting up media streams. Example:
import aiortc

class AudioGeneratorProcessor(AudioPublisher):
    name = "audio_generator"
    
    def __init__(self):
        self._audio_track = aiortc.AudioStreamTrack()
    
    def publish_audio_track(self) -> aiortc.AudioStreamTrack:
        return self._audio_track

AudioProcessorPublisher

Combines AudioProcessor and AudioPublisher - processes incoming audio and publishes processed audio back.
from vision_agents.core.processors.base_processor import AudioProcessorPublisher

class AudioProcessorPublisher(AudioPublisher, AudioProcessor, metaclass=abc.ABCMeta):
    """
    Base class for audio processors that both process incoming audio tracks
    and publish the audio back to the call.
    """
This is useful for audio effects, noise cancellation, or voice transformation.

Complete Example: Audio Level Monitor

import logging
import numpy as np
from collections import deque
from getstream.video.rtc import PcmData
from vision_agents.core.processors.base_processor import AudioProcessor
from vision_agents.core.events import EventManager, PluginBaseEvent
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)

@dataclass
class AudioLevelEvent(PluginBaseEvent):
    """Event emitted when audio level changes significantly."""
    type: str = field(default="audio.level_change", init=False)
    participant_id: str = ""
    volume: float = 0.0
    is_speaking: bool = False

class AudioLevelMonitor(AudioProcessor):
    """
    Monitors audio levels and detects when participants are speaking.
    """
    
    name = "audio_level_monitor"
    
    def __init__(
        self,
        speech_threshold: float = 0.02,
        window_size: int = 10,
    ):
        self.speech_threshold = speech_threshold
        self.window_size = window_size
        
        # Track recent volume levels
        self._volume_history: dict[str, deque] = {}
        self._speaking_state: dict[str, bool] = {}
        
        # Event system
        self.events = EventManager()
        self.events.register(AudioLevelEvent)
        
        logger.info("Audio Level Monitor initialized")
    
    async def process_audio(self, audio_data: PcmData) -> None:
        """
        Process audio data and detect speech activity.
        """
        # Calculate volume (RMS)
        samples = audio_data.samples
        volume = np.sqrt(np.mean(samples ** 2))
        
        # Get participant ID
        participant_id = "unknown"
        if audio_data.participant:
            participant_id = audio_data.participant.id
        
        # Initialize history for new participants
        if participant_id not in self._volume_history:
            self._volume_history[participant_id] = deque(maxlen=self.window_size)
            self._speaking_state[participant_id] = False
        
        # Update volume history
        self._volume_history[participant_id].append(volume)
        
        # Calculate average volume over window
        avg_volume = np.mean(list(self._volume_history[participant_id]))
        
        # Detect speech state change
        is_speaking = avg_volume > self.speech_threshold
        was_speaking = self._speaking_state[participant_id]
        
        if is_speaking != was_speaking:
            self._speaking_state[participant_id] = is_speaking
            
            # Emit event on state change
            self.events.send(AudioLevelEvent(
                plugin_name=self.name,
                participant_id=participant_id,
                volume=avg_volume,
                is_speaking=is_speaking,
            ))
            
            logger.info(
                f"Participant {participant_id} "
                f"{'started' if is_speaking else 'stopped'} speaking "
                f"(volume: {avg_volume:.3f})"
            )
    
    async def close(self) -> None:
        """Clean up resources."""
        self._volume_history.clear()
        self._speaking_state.clear()
        logger.info("Audio Level Monitor closed")

Usage

from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini

monitor = AudioLevelMonitor(
    speech_threshold=0.02,
    window_size=10,
)

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Audio Agent", id="agent"),
    instructions="Monitor audio levels in the call.",
    llm=gemini.Realtime(),
    processors=[monitor],
)

# React to audio level events
@agent.events.subscribe
async def on_audio_level(event: AudioLevelEvent):
    if event.is_speaking:
        logger.info(f"{event.participant_id} started speaking")
        await agent.simple_response(
            f"I hear {event.participant_id} speaking."
        )

await agent.start()

Example: Simple Audio Logger

import logging
from getstream.video.rtc import PcmData
from vision_agents.core.processors.base_processor import AudioProcessor

logger = logging.getLogger(__name__)

class AudioLogger(AudioProcessor):
    """Logs basic information about incoming audio."""
    
    name = "audio_logger"
    
    def __init__(self):
        self._total_samples = 0
        logger.info("Audio Logger initialized")
    
    async def process_audio(self, audio_data: PcmData) -> None:
        """
        Log audio metadata.
        """
        samples = audio_data.samples
        sample_rate = audio_data.sample_rate
        channels = audio_data.channels
        
        self._total_samples += len(samples)
        
        # Calculate duration
        duration_ms = (len(samples) / sample_rate) * 1000
        
        logger.info(
            f"Audio chunk: {len(samples)} samples, "
            f"{sample_rate}Hz, {channels}ch, "
            f"{duration_ms:.1f}ms "
            f"(total: {self._total_samples} samples)"
        )
    
    async def close(self) -> None:
        total_duration = self._total_samples / 48000  # Assuming 48kHz
        logger.info(f"Audio Logger closed. Total duration: {total_duration:.1f}s")

Working with Audio Data

Audio Format

PcmData provides raw PCM audio samples:
async def process_audio(self, audio_data: PcmData) -> None:
    # Audio samples as numpy array (dtype: int16 or float32)
    samples = audio_data.samples
    
    # Sample rate (Hz)
    sample_rate = audio_data.sample_rate  # e.g., 16000, 48000
    
    # Number of channels
    channels = audio_data.channels  # 1 = mono, 2 = stereo
    
    # Participant information
    if audio_data.participant:
        participant_id = audio_data.participant.id
        participant_name = audio_data.participant.name

Common Audio Processing Tasks

Calculate Volume (RMS):
import numpy as np

volume = np.sqrt(np.mean(audio_data.samples ** 2))
Detect Silence:
silence_threshold = 0.01
max_amplitude = np.abs(audio_data.samples).max()
is_silent = max_amplitude < silence_threshold
Convert to Different Sample Rate:
import scipy.signal

original_rate = audio_data.sample_rate
target_rate = 16000

resampled = scipy.signal.resample(
    audio_data.samples,
    int(len(audio_data.samples) * target_rate / original_rate)
)
Extract Audio Features:
import librosa

# Convert to float32 if needed
samples_float = audio_data.samples.astype(np.float32)

# Extract MFCC features
mfccs = librosa.feature.mfcc(
    y=samples_float,
    sr=audio_data.sample_rate,
    n_mfcc=13
)

Best Practices

1

Handle Different Sample Rates

Audio data may arrive at different sample rates. Check and handle accordingly:
if audio_data.sample_rate != self.expected_rate:
    logger.warning(f"Unexpected sample rate: {audio_data.sample_rate}")
2

Process Audio Efficiently

Audio processing happens frequently. Keep it lightweight:
# Good: Simple calculations
volume = np.abs(audio_data.samples).mean()

# Bad: Heavy processing in main loop
# await expensive_ml_inference(audio_data)
3

Use Thread Pools for Heavy Work

Run expensive audio processing in thread pools:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
    self.executor,
    self._heavy_audio_processing,
    audio_data.samples
)
4

Handle Missing Participant Info

Participant data may be None for the agent’s own audio:
participant_id = "agent"
if audio_data.participant:
    participant_id = audio_data.participant.id

Common Use Cases

Voice Activity Detection (VAD): Detect when participants are speaking vs. silent. Audio Transcription: Send audio chunks to a speech-to-text service. Noise Analysis: Monitor background noise levels, detect audio quality issues. Audio Recording: Save audio chunks to disk for analysis or archival. Real-time Audio Effects: Apply filters, noise cancellation, or echo suppression.

See Also

Build docs developers (and LLMs) love