Skip to main content
Processors extend agent capabilities by processing audio or video streams. Use them for custom detection, analysis, transformation, or any real-time media processing.

Processor Base Classes

The framework provides base classes for different processor types:
from vision_agents.core.processors.base_processor import (
    Processor,              # Base for all processors
    VideoProcessor,         # Process incoming video
    VideoPublisher,         # Publish outgoing video
    VideoProcessorPublisher,  # Both process and publish video
    AudioProcessor,         # Process incoming audio
    AudioPublisher,         # Publish outgoing audio
    AudioProcessorPublisher,  # Both process and publish audio
)

Video Processor Publisher

The most common pattern is VideoProcessorPublisher - receives video, processes it, and publishes annotated frames back:
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from dataclasses import dataclass, field

import aiortc
import av
import cv2
import numpy as np

from vision_agents.core.processors.base_processor import VideoProcessorPublisher
from vision_agents.core.events import EventManager, PluginBaseEvent
from vision_agents.core.utils.video_forwarder import VideoForwarder
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.warmup import Warmable

logger = logging.getLogger(__name__)

@dataclass
class DetectionEvent(PluginBaseEvent):
    type: str = field(default="detection.completed", init=False)
    objects: list = field(default_factory=list)
    confidence: float = 0.0

class CustomDetectionProcessor(VideoProcessorPublisher, Warmable[Optional[Any]]):
    """
    Custom object detection processor.
    
    - Processes incoming video frames
    - Runs detection model
    - Annotates frames with bounding boxes
    - Publishes processed video back to call
    - Emits events when objects are detected
    """
    
    name = "custom_detection"
    
    def __init__(
        self,
        model_path: str = "model.pt",
        conf_threshold: float = 0.5,
        fps: int = 10,
        max_workers: int = 10,
    ):
        self.model_path = model_path
        self.conf_threshold = conf_threshold
        self.fps = fps
        self.max_workers = max_workers
        
        # Video output track
        self._video_track = QueuedVideoTrack()
        self._video_forwarder: Optional[VideoForwarder] = None
        
        # Model (loaded in warmup)
        self._model: Optional[Any] = None
        
        # Thread pool for CPU-intensive work
        self.executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix="custom_detection"
        )
        self._shutdown = False
        
        # Event system
        self.events = EventManager()
        self.events.register(DetectionEvent)
        
        logger.info(f"Custom Detection Processor initialized")
    
    async def on_warmup(self) -> Optional[Any]:
        """
        Load model during warmup phase.
        This runs once at startup and the result is cached.
        """
        try:
            from ultralytics import YOLO
            
            loop = asyncio.get_event_loop()
            
            def load_model():
                model = YOLO(self.model_path)
                model.to("cpu")
                return model
            
            model = await loop.run_in_executor(self.executor, load_model)
            logger.info(f"Model loaded: {self.model_path}")
            return model
        except Exception as e:
            logger.warning(f"Model failed to load: {e}")
            return None
    
    def on_warmed_up(self, resource: Optional[Any]) -> None:
        """Receive the warmed-up model instance."""
        self._model = resource
    
    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        """
        Start processing video from the incoming track.
        """
        logger.info(f"Starting video processing at {self.fps} FPS")
        
        # Use shared forwarder if provided, or create new one
        self._video_forwarder = shared_forwarder or VideoForwarder(
            track,
            max_buffer=self.fps,
            fps=self.fps,
            name="custom_detection_forwarder",
        )
        
        # Register frame handler
        self._video_forwarder.add_frame_handler(
            self._process_and_publish_frame,
            fps=float(self.fps),
            name="detection",
        )
    
    async def _process_and_publish_frame(self, frame: av.VideoFrame):
        """Process a single frame and publish result."""
        if self._shutdown or not self._model:
            await self._video_track.add_frame(frame)
            return
        
        try:
            # Convert frame to numpy array
            frame_bgr = frame.to_ndarray(format="bgr24")
            
            # Run detection in thread pool
            loop = asyncio.get_event_loop()
            detections = await loop.run_in_executor(
                self.executor,
                self._detect_objects,
                frame_bgr
            )
            
            # Annotate frame
            annotated_frame = self._annotate_frame(frame_bgr, detections)
            
            # Convert back to VideoFrame
            output_frame = av.VideoFrame.from_ndarray(
                annotated_frame,
                format="bgr24"
            )
            
            # Publish frame
            await self._video_track.add_frame(output_frame)
            
            # Emit detection event
            if detections:
                self.events.send(DetectionEvent(
                    plugin_name=self.name,
                    objects=detections,
                    confidence=max(d["confidence"] for d in detections)
                ))
        
        except Exception:
            logger.exception("Frame processing failed")
            await self._video_track.add_frame(frame)
    
    def _detect_objects(self, frame_bgr: np.ndarray) -> list:
        """
        Run object detection (synchronous, runs in thread pool).
        """
        if not self._model:
            return []
        
        results = self._model(
            frame_bgr,
            verbose=False,
            conf=self.conf_threshold,
        )
        
        detections = []
        if results and results[0].boxes:
            boxes = results[0].boxes
            for box in boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                conf = float(box.conf[0])
                cls = int(box.cls[0])
                
                detections.append({
                    "bbox": (int(x1), int(y1), int(x2), int(y2)),
                    "confidence": conf,
                    "class": cls,
                    "label": results[0].names[cls],
                })
        
        return detections
    
    def _annotate_frame(
        self,
        frame_bgr: np.ndarray,
        detections: list
    ) -> np.ndarray:
        """
        Draw bounding boxes and labels on frame.
        """
        annotated = frame_bgr.copy()
        
        for det in detections:
            x1, y1, x2, y2 = det["bbox"]
            conf = det["confidence"]
            label = det["label"]
            
            # Draw rectangle
            cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Draw label
            text = f"{label} {conf:.2f}"
            cv2.putText(
                annotated,
                text,
                (x1, y1 - 5),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 255, 0),
                2,
            )
        
        return annotated
    
    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        """Return the video track to publish to the call."""
        return self._video_track
    
    async def stop_processing(self) -> None:
        """Stop processing video."""
        if self._video_forwarder:
            await self._video_forwarder.remove_frame_handler(
                self._process_and_publish_frame
            )
    
    async def close(self) -> None:
        """Clean up resources."""
        self._shutdown = True
        if self._video_forwarder:
            await self._video_forwarder.remove_frame_handler(
                self._process_and_publish_frame
            )
        self.executor.shutdown(wait=False)

Using the Processor

from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini

processor = CustomDetectionProcessor(
    model_path="yolo11n.pt",
    conf_threshold=0.5,
    fps=10,
)

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Detection Agent", id="agent"),
    instructions="You can see objects detected in the video.",
    llm=gemini.Realtime(fps=3),
    processors=[processor],
)

# React to detections
@agent.events.subscribe
async def on_detection(event: DetectionEvent):
    if len(event.objects) > 0:
        logger.info(f"Detected {len(event.objects)} objects")
        # Trigger agent action
        await agent.simple_response(
            f"I detected {len(event.objects)} objects. What should I do?"
        )

Audio Processor

Process incoming audio data:
from vision_agents.core.processors.base_processor import AudioProcessor
from getstream.video.rtc import PcmData

class CustomAudioProcessor(AudioProcessor):
    name = "custom_audio"
    
    async def process_audio(self, audio_data: PcmData) -> None:
        """
        Process audio data.
        
        Args:
            audio_data: PcmData with samples and metadata
                       audio_data.participant contains participant ID
        """
        # Access audio samples
        samples = audio_data.samples  # numpy array
        sample_rate = audio_data.sample_rate
        channels = audio_data.channels
        
        # Your audio processing logic
        logger.info(
            f"Processing {len(samples)} samples at {sample_rate}Hz"
        )
    
    async def close(self) -> None:
        pass

Warmable Resources

Use Warmable mixin to load heavy resources (models, databases) once at startup:
from vision_agents.core.warmup import Warmable

class MyProcessor(VideoProcessorPublisher, Warmable[YourModelType]):
    def __init__(self):
        self._model: Optional[YourModelType] = None
    
    async def on_warmup(self) -> YourModelType:
        """
        Load resource once at startup.
        Runs in thread pool automatically.
        Result is cached globally and shared between instances.
        """
        model = YourModelType.load("path/to/model")
        return model
    
    def on_warmed_up(self, model: YourModelType) -> None:
        """
        Receive the warmed-up resource.
        Called every time a new agent is initialized.
        """
        self._model = model
Warmable resources are loaded once and shared between all processor instances in the application. This improves startup time and memory usage.

Event System

Emit custom events for other components to react to:
from vision_agents.core.events import EventManager, PluginBaseEvent
from dataclasses import dataclass, field

@dataclass
class MyCustomEvent(PluginBaseEvent):
    type: str = field(default="my.custom.event", init=False)
    data: str = ""
    confidence: float = 0.0

class MyProcessor(VideoProcessorPublisher):
    def __init__(self):
        self.events = EventManager()
        self.events.register(MyCustomEvent)
    
    async def _process_frame(self, frame):
        # ... processing logic ...
        
        # Emit event
        self.events.send(MyCustomEvent(
            plugin_name=self.name,
            data="Something detected",
            confidence=0.95,
        ))
Subscribe to events at the agent level:
@agent.events.subscribe
async def on_my_event(event: MyCustomEvent):
    logger.info(f"Event received: {event.data}")

Production Best Practices

1

Use Thread Pools

Always run CPU-intensive work in thread pools:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
    self.executor,
    self._heavy_computation,
    data
)
2

Handle Shutdown Gracefully

async def close(self) -> None:
    self._shutdown = True
    self.executor.shutdown(wait=False)
3

Use QueuedVideoTrack

QueuedVideoTrack automatically handles backpressure and frame drops:
self._video_track = QueuedVideoTrack(
    max_buffer=30  # Keep 30 frames max
)
4

Error Handling

try:
    processed = await self._process_frame(frame)
except Exception:
    logger.exception("Processing failed")
    # Return original frame on error
    await self._video_track.add_frame(frame)

Complete Examples

  • Security Camera: examples/05_security_camera_example/security_camera_processor.py
  • YOLO Pose: plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py
  • Roboflow: plugins/roboflow/vision_agents/plugins/roboflow/roboflow_local_processor.py

Next Steps

Build docs developers (and LLMs) love