Audio processor classes handle incoming audio streams, process audio data, and optionally publish processed audio back to the call. Use these for audio analysis, transcription, voice activity detection, or audio transformation.
AudioProcessor
Processes incoming audio data without publishing audio back.
from vision_agents.core.processors.base_processor import AudioProcessor
class AudioProcessor(Processor, metaclass=abc.ABCMeta):
"""
Base class for all audio processors that process incoming audio tracks.
"""
Methods
process_audio
async def process_audio(self, audio_data: PcmData) -> None:
"""Process audio data. Override this method to implement audio processing."""
Called for each chunk of incoming audio data. Process the audio samples, extract features, or perform analysis.
PCM audio data containing samples and metadata
PcmData Structure:
audio_data.samples - numpy array of audio samples
audio_data.sample_rate - Sample rate in Hz (typically 16000 or 48000)
audio_data.channels - Number of audio channels (1 for mono, 2 for stereo)
audio_data.participant - Participant information (ID, name, etc.)
Example:
async def process_audio(self, audio_data: PcmData) -> None:
# Access audio samples
samples = audio_data.samples # numpy array
sample_rate = audio_data.sample_rate
channels = audio_data.channels
participant = audio_data.participant
logger.info(
f"Processing {len(samples)} samples at {sample_rate}Hz "
f"from {participant.id if participant else 'unknown'}"
)
# Your audio processing logic
volume = np.abs(samples).mean()
if volume > 0.1:
logger.info(f"High volume detected: {volume:.2f}")
AudioPublisher
Publishes outgoing audio tracks without processing incoming audio.
from vision_agents.core.processors.base_processor import AudioPublisher
class AudioPublisher(Processor, metaclass=abc.ABCMeta):
"""
Base class for audio processors that publish outgoing audio tracks.
"""
Methods
publish_audio_track
def publish_audio_track(self) -> aiortc.AudioStreamTrack:
"""Returns an audio track to publish to the call."""
Return the audio track to publish. Called by the agent when setting up media streams.
Example:
import aiortc
class AudioGeneratorProcessor(AudioPublisher):
name = "audio_generator"
def __init__(self):
self._audio_track = aiortc.AudioStreamTrack()
def publish_audio_track(self) -> aiortc.AudioStreamTrack:
return self._audio_track
AudioProcessorPublisher
Combines AudioProcessor and AudioPublisher - processes incoming audio and publishes processed audio back.
from vision_agents.core.processors.base_processor import AudioProcessorPublisher
class AudioProcessorPublisher(AudioPublisher, AudioProcessor, metaclass=abc.ABCMeta):
"""
Base class for audio processors that both process incoming audio tracks
and publish the audio back to the call.
"""
This is useful for audio effects, noise cancellation, or voice transformation.
Complete Example: Audio Level Monitor
import logging
import numpy as np
from collections import deque
from getstream.video.rtc import PcmData
from vision_agents.core.processors.base_processor import AudioProcessor
from vision_agents.core.events import EventManager, PluginBaseEvent
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class AudioLevelEvent(PluginBaseEvent):
"""Event emitted when audio level changes significantly."""
type: str = field(default="audio.level_change", init=False)
participant_id: str = ""
volume: float = 0.0
is_speaking: bool = False
class AudioLevelMonitor(AudioProcessor):
"""
Monitors audio levels and detects when participants are speaking.
"""
name = "audio_level_monitor"
def __init__(
self,
speech_threshold: float = 0.02,
window_size: int = 10,
):
self.speech_threshold = speech_threshold
self.window_size = window_size
# Track recent volume levels
self._volume_history: dict[str, deque] = {}
self._speaking_state: dict[str, bool] = {}
# Event system
self.events = EventManager()
self.events.register(AudioLevelEvent)
logger.info("Audio Level Monitor initialized")
async def process_audio(self, audio_data: PcmData) -> None:
"""
Process audio data and detect speech activity.
"""
# Calculate volume (RMS)
samples = audio_data.samples
volume = np.sqrt(np.mean(samples ** 2))
# Get participant ID
participant_id = "unknown"
if audio_data.participant:
participant_id = audio_data.participant.id
# Initialize history for new participants
if participant_id not in self._volume_history:
self._volume_history[participant_id] = deque(maxlen=self.window_size)
self._speaking_state[participant_id] = False
# Update volume history
self._volume_history[participant_id].append(volume)
# Calculate average volume over window
avg_volume = np.mean(list(self._volume_history[participant_id]))
# Detect speech state change
is_speaking = avg_volume > self.speech_threshold
was_speaking = self._speaking_state[participant_id]
if is_speaking != was_speaking:
self._speaking_state[participant_id] = is_speaking
# Emit event on state change
self.events.send(AudioLevelEvent(
plugin_name=self.name,
participant_id=participant_id,
volume=avg_volume,
is_speaking=is_speaking,
))
logger.info(
f"Participant {participant_id} "
f"{'started' if is_speaking else 'stopped'} speaking "
f"(volume: {avg_volume:.3f})"
)
async def close(self) -> None:
"""Clean up resources."""
self._volume_history.clear()
self._speaking_state.clear()
logger.info("Audio Level Monitor closed")
Usage
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini
monitor = AudioLevelMonitor(
speech_threshold=0.02,
window_size=10,
)
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Audio Agent", id="agent"),
instructions="Monitor audio levels in the call.",
llm=gemini.Realtime(),
processors=[monitor],
)
# React to audio level events
@agent.events.subscribe
async def on_audio_level(event: AudioLevelEvent):
if event.is_speaking:
logger.info(f"{event.participant_id} started speaking")
await agent.simple_response(
f"I hear {event.participant_id} speaking."
)
await agent.start()
Example: Simple Audio Logger
import logging
from getstream.video.rtc import PcmData
from vision_agents.core.processors.base_processor import AudioProcessor
logger = logging.getLogger(__name__)
class AudioLogger(AudioProcessor):
"""Logs basic information about incoming audio."""
name = "audio_logger"
def __init__(self):
self._total_samples = 0
logger.info("Audio Logger initialized")
async def process_audio(self, audio_data: PcmData) -> None:
"""
Log audio metadata.
"""
samples = audio_data.samples
sample_rate = audio_data.sample_rate
channels = audio_data.channels
self._total_samples += len(samples)
# Calculate duration
duration_ms = (len(samples) / sample_rate) * 1000
logger.info(
f"Audio chunk: {len(samples)} samples, "
f"{sample_rate}Hz, {channels}ch, "
f"{duration_ms:.1f}ms "
f"(total: {self._total_samples} samples)"
)
async def close(self) -> None:
total_duration = self._total_samples / 48000 # Assuming 48kHz
logger.info(f"Audio Logger closed. Total duration: {total_duration:.1f}s")
Working with Audio Data
PcmData provides raw PCM audio samples:
async def process_audio(self, audio_data: PcmData) -> None:
# Audio samples as numpy array (dtype: int16 or float32)
samples = audio_data.samples
# Sample rate (Hz)
sample_rate = audio_data.sample_rate # e.g., 16000, 48000
# Number of channels
channels = audio_data.channels # 1 = mono, 2 = stereo
# Participant information
if audio_data.participant:
participant_id = audio_data.participant.id
participant_name = audio_data.participant.name
Common Audio Processing Tasks
Calculate Volume (RMS):
import numpy as np
volume = np.sqrt(np.mean(audio_data.samples ** 2))
Detect Silence:
silence_threshold = 0.01
max_amplitude = np.abs(audio_data.samples).max()
is_silent = max_amplitude < silence_threshold
Convert to Different Sample Rate:
import scipy.signal
original_rate = audio_data.sample_rate
target_rate = 16000
resampled = scipy.signal.resample(
audio_data.samples,
int(len(audio_data.samples) * target_rate / original_rate)
)
Extract Audio Features:
import librosa
# Convert to float32 if needed
samples_float = audio_data.samples.astype(np.float32)
# Extract MFCC features
mfccs = librosa.feature.mfcc(
y=samples_float,
sr=audio_data.sample_rate,
n_mfcc=13
)
Best Practices
Handle Different Sample Rates
Audio data may arrive at different sample rates. Check and handle accordingly:if audio_data.sample_rate != self.expected_rate:
logger.warning(f"Unexpected sample rate: {audio_data.sample_rate}")
Process Audio Efficiently
Audio processing happens frequently. Keep it lightweight:# Good: Simple calculations
volume = np.abs(audio_data.samples).mean()
# Bad: Heavy processing in main loop
# await expensive_ml_inference(audio_data)
Use Thread Pools for Heavy Work
Run expensive audio processing in thread pools:loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
self._heavy_audio_processing,
audio_data.samples
)
Handle Missing Participant Info
Participant data may be None for the agent’s own audio:participant_id = "agent"
if audio_data.participant:
participant_id = audio_data.participant.id
Common Use Cases
Voice Activity Detection (VAD):
Detect when participants are speaking vs. silent.
Audio Transcription:
Send audio chunks to a speech-to-text service.
Noise Analysis:
Monitor background noise levels, detect audio quality issues.
Audio Recording:
Save audio chunks to disk for analysis or archival.
Real-time Audio Effects:
Apply filters, noise cancellation, or echo suppression.
See Also