Processor is the abstract base class for all audio and video processors in the Vision Agents framework. All custom processors must inherit from this class or one of its specialized subclasses.
Class Definition
from vision_agents.core.processors.base_processor import Processor
class Processor(abc.ABC):
"""A base class for all audio and video processors."""
Abstract Properties
name
@property
@abc.abstractmethod
def name(self) -> str:
"""Processor name."""
Unique identifier for the processor instance. Used for logging, event tracking, and debugging.
Processor name identifier
Example:
class CustomProcessor(Processor):
name = "custom_detection"
Abstract Methods
close
async def close(self) -> None:
"""Close the processor and clean up resources when the application exits."""
Called when the application shuts down. Implement this to clean up resources like thread pools, file handles, network connections, or model instances.
Example:
async def close(self) -> None:
self._shutdown = True
if self._video_forwarder:
await self._video_forwarder.remove_frame_handler(self._process_frame)
self.executor.shutdown(wait=False)
logger.info("Processor closed")
Optional Methods
attach_agent
def attach_agent(self, agent: "Agent") -> None:
"""A method to perform action with an Agent inside the processor."""
Called when the processor is attached to an agent. Use this to register custom events, access agent configuration, or set up agent-specific resources.
The agent instance this processor is attached to
Example:
def attach_agent(self, agent: Agent) -> None:
# Register custom event handlers
@agent.events.subscribe
async def on_detection(event: DetectionEvent):
await agent.simple_response(f"Detected {event.objects}")
Specialized Processor Types
Instead of using Processor directly, use one of these specialized base classes:
- VideoProcessor - Process incoming video tracks
- VideoPublisher - Publish outgoing video tracks
- VideoProcessorPublisher - Both process and publish video
- AudioProcessor - Process incoming audio data
- AudioPublisher - Publish outgoing audio tracks
- AudioProcessorPublisher - Both process and publish audio
Complete Example
import logging
from vision_agents.core.processors.base_processor import Processor
logger = logging.getLogger(__name__)
class SimpleLoggingProcessor(Processor):
"""Logs when processor lifecycle events occur."""
name = "simple_logger"
def __init__(self):
self._active = True
logger.info(f"Processor {self.name} initialized")
def attach_agent(self, agent: "Agent") -> None:
logger.info(f"Processor {self.name} attached to agent {agent.id}")
async def close(self) -> None:
self._active = False
logger.info(f"Processor {self.name} closed")
Usage
from vision_agents.core import Agent, User
from vision_agents.plugins import getstream, gemini
processor = SimpleLoggingProcessor()
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Agent", id="agent"),
instructions="Test agent with processor",
llm=gemini.Realtime(),
processors=[processor], # Attach processor
)
await agent.start()
# Processor lifecycle:
# 1. __init__() - Processor created
# 2. attach_agent() - Attached to agent
# 3. ... processor active ...
# 4. close() - Cleanup when agent stops
Best Practices
Always Clean Up Resources
Implement close() to properly shut down thread pools, network connections, and models:async def close(self) -> None:
self._shutdown = True
self.executor.shutdown(wait=False)
Use Descriptive Names
Choose clear, unique processor names for logging and debugging:name = "yolo_pose_detection" # Good
name = "processor1" # Bad
Handle Exceptions
Catch and log exceptions to prevent processor failures from crashing the agent:async def close(self) -> None:
try:
await self._cleanup()
except Exception:
logger.exception("Cleanup failed")
See Also