Overview
The SessionRegistry is a stateless facade over shared storage that handles multi-node session management. It manages serialization, key naming, and TTL (time-to-live) management without holding any session state. The caller (typically AgentLauncher) owns all session tracking and drives refreshes.
When no storage backend is provided, an InMemorySessionKVStore is used by default, which is suitable for single-node or development environments.
Constructor
SessionRegistry(
store: SessionKVStore | None = None,
*,
node_id: str | None = None,
ttl: float = 30.0
)
store
SessionKVStore | None
default:"None"
Storage backend for session data. If not provided, uses InMemorySessionKVStore by default.
Unique identifier for this node. If not provided, a UUID will be generated.
Time-to-live in seconds for session records. Must be greater than 0.
Properties
node_id
Returns the unique identifier for this node.
Lifecycle Methods
start
async def start() -> None
Initialize the storage backend. Should be called before using the registry.
stop
Close the storage backend and clean up resources.
Session Management Methods
register
async def register(call_id: str, session_id: str) -> None
Write a new session record to storage.
The call identifier associated with this session.
Unique identifier for the session.
remove
async def remove(call_id: str, session_id: str) -> None
Delete all storage keys for a session.
The call identifier associated with this session.
Unique identifier for the session to remove.
get
async def get(call_id: str, session_id: str) -> SessionInfo | None
Look up a session by ID from shared storage.
The call identifier associated with this session.
Unique identifier for the session.
Returns: SessionInfo | None - Session information if found, None otherwise.
get_for_call
async def get_for_call(call_id: str) -> list[SessionInfo]
Return all sessions for a given call across all nodes.
The call identifier to retrieve sessions for.
Returns: list[SessionInfo] - List of all sessions associated with the call.
Metrics Methods
update_metrics
async def update_metrics(
call_id: str,
session_id: str,
metrics: dict[str, int | float | None]
) -> None
Push updated metrics for a session into storage.
The call identifier associated with this session.
Unique identifier for the session.
metrics
dict[str, int | float | None]
required
Metrics data to update for the session.
refresh
async def refresh(sessions: dict[str, str]) -> None
Refresh TTLs for the given sessions.
Mapping of session_id to call_id.
Close Request Methods
request_close
async def request_close(call_id: str, session_id: str) -> None
Set a close flag for a session, enabling asynchronous close from any node.
The call identifier associated with this session.
Unique identifier for the session to close.
get_close_requests
async def get_close_requests(sessions: dict[str, str]) -> list[str]
Return session IDs that have a pending close request.
Mapping of session_id to call_id.
Returns: list[str] - List of session IDs with pending close requests.
Context Manager Support
The SessionRegistry supports async context manager protocol:
async with SessionRegistry(store=my_store) as registry:
await registry.register("call_123", "session_456")
# ... use registry
# Automatically calls start() on enter and stop() on exit
Data Structures
SessionInfo
@dataclass
class SessionInfo:
session_id: str
call_id: str
node_id: str
started_at: float
metrics_updated_at: float
metrics: dict[str, int | float | None] = field(default_factory=dict)
Represents a session registered in the session registry.
Unique identifier for the session.
The call identifier associated with this session.
Identifier of the node hosting this session.
Timestamp (Unix time) when the session was started.
Timestamp (Unix time) when metrics were last updated.
metrics
dict[str, int | float | None]
Dictionary of metric names to values.
from_dict
@classmethod
from_dict(cls, data: dict[str, Any]) -> SessionInfo
Construct a SessionInfo from a dictionary, silently ignoring unknown keys.
Dictionary containing session information.
Returns: SessionInfo - Constructed SessionInfo instance.
Usage Example
from vision_agents.core.agents.session_registry import SessionRegistry
from vision_agents.core.agents.session_registry import RedisSessionKVStore
# For single-node (development)
async with SessionRegistry() as registry:
await registry.register("call_123", "session_456")
info = await registry.get("call_123", "session_456")
print(f"Session started at: {info.started_at}")
# For multi-node (production)
redis_store = RedisSessionKVStore(url="redis://localhost:6379/0")
async with SessionRegistry(store=redis_store, node_id="node-1") as registry:
await registry.register("call_123", "session_789")
await registry.update_metrics("call_123", "session_789", {"tokens": 150})