Subscribe to detection events to trigger agent actions:
from vision_agents.plugins import roboflowimport randomprocessor = roboflow.RoboflowLocalDetectionProcessor( classes=["person", "sports ball"], conf_threshold=0.5, fps=5,)agent = Agent( processors=[processor], ...)questions = [ "What's happening on the field right now?", "Provide an update on the situation.", "What has just happened?",]@agent.events.subscribeasync def on_detection_completed(event: roboflow.DetectionCompletedEvent): """ Trigger commentary when a sports ball is detected. """ ball_detected = any( obj["label"] == "sports ball" for obj in event.objects ) if ball_detected: await agent.simple_response(random.choice(questions))
Use a debouncer to avoid calling the LLM too frequently. See examples/04_football_commentator_example/utils.py.
For non-realtime vision analysis, use VLM plugins:
from vision_agents.plugins import moondreamvlm = moondream.CloudVLM( api_key="YOUR_API_KEY", model="moondream-2",)# Analyze a frameresponse = await vlm.query( image=frame, prompt="Describe what you see in this image.")
Here’s a complete agent that watches a football game and provides commentary:
import randomfrom vision_agents.core import Agent, Userfrom vision_agents.plugins import getstream, openai, roboflowclass Debouncer: def __init__(self, interval: float): self.interval = interval self.last_call = 0 def __call__(self) -> bool: import time now = time.time() if now - self.last_call >= self.interval: self.last_call = now return True return Falseasync def create_agent(**kwargs) -> Agent: llm = openai.Realtime() agent = Agent( edge=getstream.Edge(), agent_user=User(name="AI Sports Commentator", id="agent"), instructions="You are a sports commentator. Provide exciting play-by-play.", processors=[ roboflow.RoboflowLocalDetectionProcessor( classes=["person", "sports ball"], conf_threshold=0.5, fps=5, ) ], llm=llm, ) questions = [ "Provide an update on the situation on the football field.", "What has just happened?", "What is happening on the field right now?", ] debouncer = Debouncer(8) # Call LLM once every 8 seconds max @agent.events.subscribe async def on_detection_completed(event: roboflow.DetectionCompletedEvent): ball_detected = bool( [obj for obj in event.objects if obj["label"] == "sports ball"] ) if ball_detected and debouncer(): await agent.simple_response(random.choice(questions)) return agent
processor = roboflow.RoboflowLocalDetectionProcessor( classes=["person", "car"], # Only detect specific objects conf_threshold=0.7, # Higher threshold = fewer false positives)
3
Use Shared Forwarders
When multiple processors need the same video feed, use shared_forwarder to avoid duplicate processing.
4
Handle Frame Drops
# QueuedVideoTrack automatically handles frame drops# when processing can't keep up with input rateself._video_track = QueuedVideoTrack( max_buffer=30 # Keep 30 frames max (1 second at 30 FPS))