Processor Base Classes
The framework provides base classes for different processor types:from vision_agents.core.processors.base_processor import (
Processor, # Base for all processors
VideoProcessor, # Process incoming video
VideoPublisher, # Publish outgoing video
VideoProcessorPublisher, # Both process and publish video
AudioProcessor, # Process incoming audio
AudioPublisher, # Publish outgoing audio
AudioProcessorPublisher, # Both process and publish audio
)
Video Processor Publisher
The most common pattern isVideoProcessorPublisher - receives video, processes it, and publishes annotated frames back:
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from dataclasses import dataclass, field
import aiortc
import av
import cv2
import numpy as np
from vision_agents.core.processors.base_processor import VideoProcessorPublisher
from vision_agents.core.events import EventManager, PluginBaseEvent
from vision_agents.core.utils.video_forwarder import VideoForwarder
from vision_agents.core.utils.video_track import QueuedVideoTrack
from vision_agents.core.warmup import Warmable
logger = logging.getLogger(__name__)
@dataclass
class DetectionEvent(PluginBaseEvent):
type: str = field(default="detection.completed", init=False)
objects: list = field(default_factory=list)
confidence: float = 0.0
class CustomDetectionProcessor(VideoProcessorPublisher, Warmable[Optional[Any]]):
"""
Custom object detection processor.
- Processes incoming video frames
- Runs detection model
- Annotates frames with bounding boxes
- Publishes processed video back to call
- Emits events when objects are detected
"""
name = "custom_detection"
def __init__(
self,
model_path: str = "model.pt",
conf_threshold: float = 0.5,
fps: int = 10,
max_workers: int = 10,
):
self.model_path = model_path
self.conf_threshold = conf_threshold
self.fps = fps
self.max_workers = max_workers
# Video output track
self._video_track = QueuedVideoTrack()
self._video_forwarder: Optional[VideoForwarder] = None
# Model (loaded in warmup)
self._model: Optional[Any] = None
# Thread pool for CPU-intensive work
self.executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="custom_detection"
)
self._shutdown = False
# Event system
self.events = EventManager()
self.events.register(DetectionEvent)
logger.info(f"Custom Detection Processor initialized")
async def on_warmup(self) -> Optional[Any]:
"""
Load model during warmup phase.
This runs once at startup and the result is cached.
"""
try:
from ultralytics import YOLO
loop = asyncio.get_event_loop()
def load_model():
model = YOLO(self.model_path)
model.to("cpu")
return model
model = await loop.run_in_executor(self.executor, load_model)
logger.info(f"Model loaded: {self.model_path}")
return model
except Exception as e:
logger.warning(f"Model failed to load: {e}")
return None
def on_warmed_up(self, resource: Optional[Any]) -> None:
"""Receive the warmed-up model instance."""
self._model = resource
async def process_video(
self,
track: aiortc.VideoStreamTrack,
participant_id: Optional[str],
shared_forwarder: Optional[VideoForwarder] = None,
) -> None:
"""
Start processing video from the incoming track.
"""
logger.info(f"Starting video processing at {self.fps} FPS")
# Use shared forwarder if provided, or create new one
self._video_forwarder = shared_forwarder or VideoForwarder(
track,
max_buffer=self.fps,
fps=self.fps,
name="custom_detection_forwarder",
)
# Register frame handler
self._video_forwarder.add_frame_handler(
self._process_and_publish_frame,
fps=float(self.fps),
name="detection",
)
async def _process_and_publish_frame(self, frame: av.VideoFrame):
"""Process a single frame and publish result."""
if self._shutdown or not self._model:
await self._video_track.add_frame(frame)
return
try:
# Convert frame to numpy array
frame_bgr = frame.to_ndarray(format="bgr24")
# Run detection in thread pool
loop = asyncio.get_event_loop()
detections = await loop.run_in_executor(
self.executor,
self._detect_objects,
frame_bgr
)
# Annotate frame
annotated_frame = self._annotate_frame(frame_bgr, detections)
# Convert back to VideoFrame
output_frame = av.VideoFrame.from_ndarray(
annotated_frame,
format="bgr24"
)
# Publish frame
await self._video_track.add_frame(output_frame)
# Emit detection event
if detections:
self.events.send(DetectionEvent(
plugin_name=self.name,
objects=detections,
confidence=max(d["confidence"] for d in detections)
))
except Exception:
logger.exception("Frame processing failed")
await self._video_track.add_frame(frame)
def _detect_objects(self, frame_bgr: np.ndarray) -> list:
"""
Run object detection (synchronous, runs in thread pool).
"""
if not self._model:
return []
results = self._model(
frame_bgr,
verbose=False,
conf=self.conf_threshold,
)
detections = []
if results and results[0].boxes:
boxes = results[0].boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0])
cls = int(box.cls[0])
detections.append({
"bbox": (int(x1), int(y1), int(x2), int(y2)),
"confidence": conf,
"class": cls,
"label": results[0].names[cls],
})
return detections
def _annotate_frame(
self,
frame_bgr: np.ndarray,
detections: list
) -> np.ndarray:
"""
Draw bounding boxes and labels on frame.
"""
annotated = frame_bgr.copy()
for det in detections:
x1, y1, x2, y2 = det["bbox"]
conf = det["confidence"]
label = det["label"]
# Draw rectangle
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
text = f"{label} {conf:.2f}"
cv2.putText(
annotated,
text,
(x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
2,
)
return annotated
def publish_video_track(self) -> aiortc.VideoStreamTrack:
"""Return the video track to publish to the call."""
return self._video_track
async def stop_processing(self) -> None:
"""Stop processing video."""
if self._video_forwarder:
await self._video_forwarder.remove_frame_handler(
self._process_and_publish_frame
)
async def close(self) -> None:
"""Clean up resources."""
self._shutdown = True
if self._video_forwarder:
await self._video_forwarder.remove_frame_handler(
self._process_and_publish_frame
)
self.executor.shutdown(wait=False)
Using the Processor
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini
processor = CustomDetectionProcessor(
model_path="yolo11n.pt",
conf_threshold=0.5,
fps=10,
)
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Detection Agent", id="agent"),
instructions="You can see objects detected in the video.",
llm=gemini.Realtime(fps=3),
processors=[processor],
)
# React to detections
@agent.events.subscribe
async def on_detection(event: DetectionEvent):
if len(event.objects) > 0:
logger.info(f"Detected {len(event.objects)} objects")
# Trigger agent action
await agent.simple_response(
f"I detected {len(event.objects)} objects. What should I do?"
)
Audio Processor
Process incoming audio data:from vision_agents.core.processors.base_processor import AudioProcessor
from getstream.video.rtc import PcmData
class CustomAudioProcessor(AudioProcessor):
name = "custom_audio"
async def process_audio(self, audio_data: PcmData) -> None:
"""
Process audio data.
Args:
audio_data: PcmData with samples and metadata
audio_data.participant contains participant ID
"""
# Access audio samples
samples = audio_data.samples # numpy array
sample_rate = audio_data.sample_rate
channels = audio_data.channels
# Your audio processing logic
logger.info(
f"Processing {len(samples)} samples at {sample_rate}Hz"
)
async def close(self) -> None:
pass
Warmable Resources
UseWarmable mixin to load heavy resources (models, databases) once at startup:
from vision_agents.core.warmup import Warmable
class MyProcessor(VideoProcessorPublisher, Warmable[YourModelType]):
def __init__(self):
self._model: Optional[YourModelType] = None
async def on_warmup(self) -> YourModelType:
"""
Load resource once at startup.
Runs in thread pool automatically.
Result is cached globally and shared between instances.
"""
model = YourModelType.load("path/to/model")
return model
def on_warmed_up(self, model: YourModelType) -> None:
"""
Receive the warmed-up resource.
Called every time a new agent is initialized.
"""
self._model = model
Warmable resources are loaded once and shared between all processor instances in the application. This improves startup time and memory usage.
Event System
Emit custom events for other components to react to:from vision_agents.core.events import EventManager, PluginBaseEvent
from dataclasses import dataclass, field
@dataclass
class MyCustomEvent(PluginBaseEvent):
type: str = field(default="my.custom.event", init=False)
data: str = ""
confidence: float = 0.0
class MyProcessor(VideoProcessorPublisher):
def __init__(self):
self.events = EventManager()
self.events.register(MyCustomEvent)
async def _process_frame(self, frame):
# ... processing logic ...
# Emit event
self.events.send(MyCustomEvent(
plugin_name=self.name,
data="Something detected",
confidence=0.95,
))
@agent.events.subscribe
async def on_my_event(event: MyCustomEvent):
logger.info(f"Event received: {event.data}")
Production Best Practices
Use Thread Pools
Always run CPU-intensive work in thread pools:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
self._heavy_computation,
data
)
Handle Shutdown Gracefully
async def close(self) -> None:
self._shutdown = True
self.executor.shutdown(wait=False)
Use QueuedVideoTrack
QueuedVideoTrack automatically handles backpressure and frame drops:
self._video_track = QueuedVideoTrack(
max_buffer=30 # Keep 30 frames max
)
Complete Examples
- Security Camera:
examples/05_security_camera_example/security_camera_processor.py - YOLO Pose:
plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py - Roboflow:
plugins/roboflow/vision_agents/plugins/roboflow/roboflow_local_processor.py
Next Steps
- Learn about RAG Integration
- Monitor processor performance: Observability
- Deploy to production: Deployment