Skip to main content
Video agents combine vision capabilities with voice interaction, enabling agents to see and respond to what’s happening on camera.

Realtime Video with Vision LLMs

Use Gemini or OpenAI’s realtime APIs to stream video directly to the LLM:
from vision_agents.core import Agent, User
from vision_agents.plugins import gemini, getstream

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="AI Golf Coach"),
    instructions="Watch the user's golf swing and provide helpful feedback.",
    llm=gemini.Realtime(fps=3),  # Stream video at 3 FPS
)
Start with 3-5 FPS for video streaming. Higher FPS increases token usage significantly.

Video Processors

Add processors to detect objects, poses, or analyze frames before sending to the LLM:

Object Detection

Detect objects in real-time using YOLO or Roboflow:
from vision_agents.plugins import roboflow

processor = roboflow.RoboflowLocalDetectionProcessor(
    model_id="rfdetr-seg-preview",
    conf_threshold=0.5,
    fps=10,
    classes=["person", "sports ball"],  # Filter specific objects
    annotate=True,  # Draw bounding boxes
)

agent = Agent(
    processors=[processor],
    ...
)

Pose Detection

Track human poses using YOLO Pose:
from vision_agents.plugins import ultralytics

processor = ultralytics.YOLOPoseProcessor(
    model_path="yolo26n-pose.pt",
    conf_threshold=0.5,
    fps=30,
    enable_hand_tracking=True,
    enable_wrist_highlights=True,
)

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(name="AI Golf Coach"),
    instructions="Read @golf_coach.md",
    llm=gemini.Realtime(fps=3),
    processors=[processor],
)

Reacting to Detections

Subscribe to detection events to trigger agent actions:
from vision_agents.plugins import roboflow
import random

processor = roboflow.RoboflowLocalDetectionProcessor(
    classes=["person", "sports ball"],
    conf_threshold=0.5,
    fps=5,
)

agent = Agent(
    processors=[processor],
    ...
)

questions = [
    "What's happening on the field right now?",
    "Provide an update on the situation.",
    "What has just happened?",
]

@agent.events.subscribe
async def on_detection_completed(event: roboflow.DetectionCompletedEvent):
    """
    Trigger commentary when a sports ball is detected.
    """
    ball_detected = any(
        obj["label"] == "sports ball" 
        for obj in event.objects
    )
    
    if ball_detected:
        await agent.simple_response(random.choice(questions))
Use a debouncer to avoid calling the LLM too frequently. See examples/04_football_commentator_example/utils.py.

Vision Language Models (VLM)

For non-realtime vision analysis, use VLM plugins:
from vision_agents.plugins import moondream

vlm = moondream.CloudVLM(
    api_key="YOUR_API_KEY",
    model="moondream-2",
)

# Analyze a frame
response = await vlm.query(
    image=frame,
    prompt="Describe what you see in this image."
)

Complete Example: Football Commentator

Here’s a complete agent that watches a football game and provides commentary:
import random
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, openai, roboflow

class Debouncer:
    def __init__(self, interval: float):
        self.interval = interval
        self.last_call = 0
    
    def __call__(self) -> bool:
        import time
        now = time.time()
        if now - self.last_call >= self.interval:
            self.last_call = now
            return True
        return False

async def create_agent(**kwargs) -> Agent:
    llm = openai.Realtime()
    
    agent = Agent(
        edge=getstream.Edge(),
        agent_user=User(name="AI Sports Commentator", id="agent"),
        instructions="You are a sports commentator. Provide exciting play-by-play.",
        processors=[
            roboflow.RoboflowLocalDetectionProcessor(
                classes=["person", "sports ball"],
                conf_threshold=0.5,
                fps=5,
            )
        ],
        llm=llm,
    )
    
    questions = [
        "Provide an update on the situation on the football field.",
        "What has just happened?",
        "What is happening on the field right now?",
    ]
    
    debouncer = Debouncer(8)  # Call LLM once every 8 seconds max
    
    @agent.events.subscribe
    async def on_detection_completed(event: roboflow.DetectionCompletedEvent):
        ball_detected = bool(
            [obj for obj in event.objects if obj["label"] == "sports ball"]
        )
        
        if ball_detected and debouncer():
            await agent.simple_response(random.choice(questions))
    
    return agent

Custom Video Processing

Create custom processors by extending VideoProcessorPublisher:
from vision_agents.core.processors.base_processor import VideoProcessorPublisher
from vision_agents.core.events import EventManager, PluginBaseEvent
from vision_agents.core.utils.video_track import QueuedVideoTrack
from dataclasses import dataclass
import av

@dataclass
class CustomDetectionEvent(PluginBaseEvent):
    type: str = "custom.detection"
    confidence: float = 0.0

class CustomProcessor(VideoProcessorPublisher):
    name = "custom_processor"
    
    def __init__(self, fps: int = 10):
        self.fps = fps
        self._video_track = QueuedVideoTrack()
        self.events = EventManager()
        self.events.register(CustomDetectionEvent)
    
    async def process_video(
        self,
        track: aiortc.VideoStreamTrack,
        participant_id: str | None,
        shared_forwarder: VideoForwarder | None = None,
    ) -> None:
        # Process incoming video frames
        async for frame in track:
            processed_frame = await self._process_frame(frame)
            await self._video_track.add_frame(processed_frame)
    
    async def _process_frame(self, frame: av.VideoFrame) -> av.VideoFrame:
        # Your custom processing logic
        return frame
    
    def publish_video_track(self) -> aiortc.VideoStreamTrack:
        return self._video_track
    
    async def stop_processing(self) -> None:
        pass
    
    async def close(self) -> None:
        pass
See Custom Processors for detailed examples.

Production Best Practices

1

Optimize Frame Rate

  • Start with 3-5 FPS for realtime LLMs
  • Use 10-30 FPS for detection processors
  • Lower FPS reduces token costs significantly
2

Filter Detections

processor = roboflow.RoboflowLocalDetectionProcessor(
    classes=["person", "car"],  # Only detect specific objects
    conf_threshold=0.7,  # Higher threshold = fewer false positives
)
3

Use Shared Forwarders

When multiple processors need the same video feed, use shared_forwarder to avoid duplicate processing.
4

Handle Frame Drops

# QueuedVideoTrack automatically handles frame drops
# when processing can't keep up with input rate
self._video_track = QueuedVideoTrack(
    max_buffer=30  # Keep 30 frames max (1 second at 30 FPS)
)

Examples

  • Golf Coach: examples/02_golf_coach_example/golf_coach_example.py
  • Football Commentator: examples/04_football_commentator_example/football_commentator_example.py
  • Security Camera: examples/05_security_camera_example/security_camera_example.py

Next Steps

Build docs developers (and LLMs) love