Skip to main content
Video processor classes handle incoming video streams, process frames, and optionally publish processed video back to the call. Use these for object detection, pose estimation, frame annotation, or any real-time video analysis.

VideoProcessor

Processes incoming video tracks without publishing video back.
from vision_agents.core.processors.base_processor import VideoProcessor

class VideoProcessor(Processor, metaclass=abc.ABCMeta):
    """
    Base class for video processors that process incoming video tracks.
    Example: a plugin that logs video frames for analysis.
    """

Methods

process_video

async def process_video(
    self,
    track: aiortc.VideoStreamTrack,
    participant_id: Optional[str],
    shared_forwarder: Optional[VideoForwarder] = None,
) -> None:
    """Start processing a video track."""
Called by the agent when a new video track is published. Set up your frame processing pipeline here.
track
aiortc.VideoStreamTrack
required
The incoming video stream track to process
participant_id
str | None
required
ID of the participant publishing the video, or None for the agent’s own track
shared_forwarder
VideoForwarder | None
Optional shared video forwarder for efficient multi-processor setups. If provided, use this instead of creating your own.
Example:
async def process_video(
    self,
    track: aiortc.VideoStreamTrack,
    participant_id: Optional[str],
    shared_forwarder: Optional[VideoForwarder] = None,
) -> None:
    logger.info(f"Starting video processing for participant {participant_id}")
    
    self._video_forwarder = shared_forwarder or VideoForwarder(
        track,
        max_buffer=self.fps,
        fps=self.fps,
        name="my_processor",
    )
    
    self._video_forwarder.add_frame_handler(
        self._process_frame,
        fps=float(self.fps),
        name="frame_processor",
    )

stop_processing

async def stop_processing(self) -> None:
    """Stop processing video. Called when all video tracks are removed."""
Clean up frame handlers and stop processing. Called when the video track ends or participant leaves. Example:
async def stop_processing(self) -> None:
    if self._video_forwarder:
        await self._video_forwarder.remove_frame_handler(self._process_frame)
        self._video_forwarder = None
    logger.info("Video processing stopped")

VideoPublisher

Publishes outgoing video tracks without processing incoming video.
from vision_agents.core.processors.base_processor import VideoPublisher

class VideoPublisher(Processor, metaclass=abc.ABCMeta):
    """
    Base class for video processors that publish outgoing video tracks.
    Example: avatar plugin generating video on the fly.
    """

Methods

publish_video_track

def publish_video_track(self) -> aiortc.VideoStreamTrack:
    """Returns a video track with the processed frames."""
Return the video track to publish to the call. Called by the agent when setting up media streams. Example:
from vision_agents.core.utils.video_track import QueuedVideoTrack

def __init__(self):
    self._video_track = QueuedVideoTrack()

def publish_video_track(self) -> aiortc.VideoStreamTrack:
    return self._video_track

# Add frames to the track elsewhere:
async def _generate_frame(self):
    frame = av.VideoFrame.from_ndarray(image_array, format="bgr24")
    await self._video_track.add_frame(frame)

VideoProcessorPublisher

Combines VideoProcessor and VideoPublisher - processes incoming video and publishes processed video back.
from vision_agents.core.processors.base_processor import VideoProcessorPublisher

class VideoProcessorPublisher(VideoProcessor, VideoPublisher, metaclass=abc.ABCMeta):
    """
    Base class for video processors that process incoming video tracks and
    publish the video back to the call.
    Example: object detection plugin that annotates video frames.
    """
This is the most commonly used video processor type. It inherits all methods from both VideoProcessor and VideoPublisher.

Complete Example: Object Detection Processor

import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Any

import aiortc
import av
import cv2
import numpy as np

from vision_agents.core.processors.base_processor import VideoProcessorPublisher
from vision_agents.core.utils.video_forwarder import VideoForwarder
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.warmup import Warmable

logger = logging.getLogger(__name__)

class ObjectDetectionProcessor(VideoProcessorPublisher, Warmable[Optional[Any]]):
    """
    Detects objects in video frames and annotates them with bounding boxes.
    """
    
    name = "object_detection"
    
    def __init__(
        self,
        model_path: str = "yolo11n.pt",
        conf_threshold: float = 0.5,
        fps: int = 10,
        max_workers: int = 10,
    ):
        self.model_path = model_path
        self.conf_threshold = conf_threshold
        self.fps = fps
        
        # Video output
        self._video_track = QueuedVideoTrack()
        self._video_forwarder: Optional[VideoForwarder] = None
        
        # Model (loaded in warmup)
        self._model: Optional[Any] = None
        
        # Thread pool for CPU work
        self.executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix="object_detection"
        )
        self._shutdown = False
        
        logger.info("Object Detection Processor initialized")
    
    async def on_warmup(self) -> Optional[Any]:
        """Load YOLO model during warmup."""
        try:
            from ultralytics import YOLO
            
            loop = asyncio.get_event_loop()
            model = await loop.run_in_executor(
                self.executor,
                lambda: YOLO(self.model_path)
            )
            logger.info(f"Model loaded: {self.model_path}")
            return model
        except Exception as e:
            logger.warning(f"Model load failed: {e}")
            return None
    
    def on_warmed_up(self, resource: Optional[Any]) -> None:
        """Receive the loaded model."""
        self._model = resource
    
    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: Optional[str],
        shared_forwarder: Optional[VideoForwarder] = None,
    ) -> None:
        """Start processing incoming video."""
        logger.info(f"Starting video processing at {self.fps} FPS")
        
        self._video_forwarder = shared_forwarder or VideoForwarder(
            track,
            max_buffer=self.fps,
            fps=self.fps,
            name="detection_forwarder",
        )
        
        self._video_forwarder.add_frame_handler(
            self._process_and_publish,
            fps=float(self.fps),
            name="detection",
        )
    
    async def _process_and_publish(self, frame: av.VideoFrame):
        """Process a single frame and publish result."""
        if self._shutdown or not self._model:
            await self._video_track.add_frame(frame)
            return
        
        try:
            # Convert to numpy
            frame_bgr = frame.to_ndarray(format="bgr24")
            
            # Detect objects in thread pool
            loop = asyncio.get_event_loop()
            detections = await loop.run_in_executor(
                self.executor,
                self._detect_objects,
                frame_bgr
            )
            
            # Annotate frame
            annotated = self._draw_boxes(frame_bgr, detections)
            
            # Convert back and publish
            output = av.VideoFrame.from_ndarray(annotated, format="bgr24")
            await self._video_track.add_frame(output)
            
        except Exception:
            logger.exception("Frame processing failed")
            await self._video_track.add_frame(frame)
    
    def _detect_objects(self, frame_bgr: np.ndarray) -> list:
        """Run detection (sync, runs in thread pool)."""
        if not self._model:
            return []
        
        results = self._model(
            frame_bgr,
            verbose=False,
            conf=self.conf_threshold,
        )
        
        detections = []
        if results and results[0].boxes:
            for box in results[0].boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                detections.append({
                    "bbox": (int(x1), int(y1), int(x2), int(y2)),
                    "conf": float(box.conf[0]),
                    "label": results[0].names[int(box.cls[0])],
                })
        
        return detections
    
    def _draw_boxes(self, frame: np.ndarray, detections: list) -> np.ndarray:
        """Draw bounding boxes on frame."""
        result = frame.copy()
        for det in detections:
            x1, y1, x2, y2 = det["bbox"]
            cv2.rectangle(result, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            text = f"{det['label']} {det['conf']:.2f}"
            cv2.putText(result, text, (x1, y1 - 5),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        return result
    
    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        """Return video track to publish."""
        return self._video_track
    
    async def stop_processing(self) -> None:
        """Stop video processing."""
        if self._video_forwarder:
            await self._video_forwarder.remove_frame_handler(
                self._process_and_publish
            )
            logger.info("Video processing stopped")
    
    async def close(self) -> None:
        """Clean up resources."""
        self._shutdown = True
        await self.stop_processing()
        self.executor.shutdown(wait=False)

Usage

from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini

processor = ObjectDetectionProcessor(
    model_path="yolo11n.pt",
    conf_threshold=0.5,
    fps=10,
)

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="Vision Agent", id="agent"),
    instructions="You can see objects detected in video.",
    llm=gemini.Realtime(fps=3),
    processors=[processor],
)

await agent.start()

Real-World Examples

From the Vision Agents source code:

Security Camera Processor

examples/05_security_camera_example/security_camera_processor.py - Detects faces and packages, tracks unique visitors, emits events when objects appear/disappear.
class SecurityCameraProcessor(VideoProcessorPublisher, Warmable[Optional[Any]]):
    name = "security_camera"
    
    def __init__(self, fps: int = 5, model_path: str = "weights.pt", ...):
        # Face detection with face_recognition library
        # Package detection with YOLO
        # Maintains visitor/package history
        # Emits PersonDetectedEvent, PackageDetectedEvent, etc.

YOLO Pose Processor

plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py - Real-time pose estimation with skeleton visualization.
class YOLOPoseProcessor(VideoProcessorPublisher):
    name = "yolo_pose"
    
    def __init__(self, model_path: str = "yolo26n-pose.pt", ...):
        # Loads YOLO pose model
        # Detects keypoints (17+ body landmarks)
        # Draws skeleton connections and wrist highlights
        # Processes frames in thread pool for performance

Best Practices

1

Use Thread Pools for CPU Work

Always run heavy computation in thread pools to avoid blocking the event loop:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
    self.executor,
    self._cpu_intensive_work,
    data
)
2

Handle Errors Gracefully

Return the original frame on processing errors:
try:
    processed = await self._process(frame)
    await self._video_track.add_frame(processed)
except Exception:
    logger.exception("Processing failed")
    await self._video_track.add_frame(frame)
3

Use QueuedVideoTrack

Use QueuedVideoTrack for automatic backpressure handling:
self._video_track = QueuedVideoTrack(max_buffer=30)
4

Implement Warmup for Models

Load heavy resources once at startup using the Warmable interface:
class MyProcessor(VideoProcessorPublisher, Warmable[Model]):
    async def on_warmup(self) -> Model:
        return load_model()
    
    def on_warmed_up(self, model: Model) -> None:
        self._model = model

See Also

Build docs developers (and LLMs) love