Skip to main content

Overview

The SessionRegistry is a stateless facade over shared storage that handles multi-node session management. It manages serialization, key naming, and TTL (time-to-live) management without holding any session state. The caller (typically AgentLauncher) owns all session tracking and drives refreshes. When no storage backend is provided, an InMemorySessionKVStore is used by default, which is suitable for single-node or development environments.

Constructor

SessionRegistry(
    store: SessionKVStore | None = None,
    *,
    node_id: str | None = None,
    ttl: float = 30.0
)
store
SessionKVStore | None
default:"None"
Storage backend for session data. If not provided, uses InMemorySessionKVStore by default.
node_id
str | None
default:"None"
Unique identifier for this node. If not provided, a UUID will be generated.
ttl
float
default:"30.0"
Time-to-live in seconds for session records. Must be greater than 0.

Properties

node_id

@property
node_id: str
Returns the unique identifier for this node.

Lifecycle Methods

start

async def start() -> None
Initialize the storage backend. Should be called before using the registry.

stop

async def stop() -> None
Close the storage backend and clean up resources.

Session Management Methods

register

async def register(call_id: str, session_id: str) -> None
Write a new session record to storage.
call_id
str
required
The call identifier associated with this session.
session_id
str
required
Unique identifier for the session.

remove

async def remove(call_id: str, session_id: str) -> None
Delete all storage keys for a session.
call_id
str
required
The call identifier associated with this session.
session_id
str
required
Unique identifier for the session to remove.

get

async def get(call_id: str, session_id: str) -> SessionInfo | None
Look up a session by ID from shared storage.
call_id
str
required
The call identifier associated with this session.
session_id
str
required
Unique identifier for the session.
Returns: SessionInfo | None - Session information if found, None otherwise.

get_for_call

async def get_for_call(call_id: str) -> list[SessionInfo]
Return all sessions for a given call across all nodes.
call_id
str
required
The call identifier to retrieve sessions for.
Returns: list[SessionInfo] - List of all sessions associated with the call.

Metrics Methods

update_metrics

async def update_metrics(
    call_id: str,
    session_id: str,
    metrics: dict[str, int | float | None]
) -> None
Push updated metrics for a session into storage.
call_id
str
required
The call identifier associated with this session.
session_id
str
required
Unique identifier for the session.
metrics
dict[str, int | float | None]
required
Metrics data to update for the session.

refresh

async def refresh(sessions: dict[str, str]) -> None
Refresh TTLs for the given sessions.
sessions
dict[str, str]
required
Mapping of session_id to call_id.

Close Request Methods

request_close

async def request_close(call_id: str, session_id: str) -> None
Set a close flag for a session, enabling asynchronous close from any node.
call_id
str
required
The call identifier associated with this session.
session_id
str
required
Unique identifier for the session to close.

get_close_requests

async def get_close_requests(sessions: dict[str, str]) -> list[str]
Return session IDs that have a pending close request.
sessions
dict[str, str]
required
Mapping of session_id to call_id.
Returns: list[str] - List of session IDs with pending close requests.

Context Manager Support

The SessionRegistry supports async context manager protocol:
async with SessionRegistry(store=my_store) as registry:
    await registry.register("call_123", "session_456")
    # ... use registry
# Automatically calls start() on enter and stop() on exit

Data Structures

SessionInfo

@dataclass
class SessionInfo:
    session_id: str
    call_id: str
    node_id: str
    started_at: float
    metrics_updated_at: float
    metrics: dict[str, int | float | None] = field(default_factory=dict)
Represents a session registered in the session registry.
session_id
str
Unique identifier for the session.
call_id
str
The call identifier associated with this session.
node_id
str
Identifier of the node hosting this session.
started_at
float
Timestamp (Unix time) when the session was started.
metrics_updated_at
float
Timestamp (Unix time) when metrics were last updated.
metrics
dict[str, int | float | None]
Dictionary of metric names to values.

from_dict

@classmethod
from_dict(cls, data: dict[str, Any]) -> SessionInfo
Construct a SessionInfo from a dictionary, silently ignoring unknown keys.
data
dict[str, Any]
required
Dictionary containing session information.
Returns: SessionInfo - Constructed SessionInfo instance.

Usage Example

from vision_agents.core.agents.session_registry import SessionRegistry
from vision_agents.core.agents.session_registry import RedisSessionKVStore

# For single-node (development)
async with SessionRegistry() as registry:
    await registry.register("call_123", "session_456")
    info = await registry.get("call_123", "session_456")
    print(f"Session started at: {info.started_at}")

# For multi-node (production)
redis_store = RedisSessionKVStore(url="redis://localhost:6379/0")
async with SessionRegistry(store=redis_store, node_id="node-1") as registry:
    await registry.register("call_123", "session_789")
    await registry.update_metrics("call_123", "session_789", {"tokens": 150})

Build docs developers (and LLMs) love