Skip to main content

Overview

The SessionKVStore is an abstract base class that defines the interface for TTL (time-to-live) key-value storage backends used by the SessionRegistry. The storage layer is a generic key-value store that works with bytes and knows nothing about nodes or sessions — the SessionRegistry owns the key scheme and all serialization. Implementations should use TTL-based key expiry. Records that are not refreshed within the TTL period are considered expired and may be garbage-collected by the backend.

Key Conventions

The following key conventions are managed by SessionRegistry:
  • sessions/{call_id}/{session_id} → JSON-serialized SessionInfo
  • close_requests/{call_id}/{session_id} → empty bytes (close flag)

Interface Methods

Lifecycle Methods

start

async def start() -> None
Initialize the storage backend (open connections, etc.). Default implementation is a no-op.

close

async def close() -> None
Close any connections held by this storage backend. Default implementation is a no-op.

Storage Methods

set

async def set(
    key: str,
    value: bytes,
    ttl: float,
    *,
    only_if_exists: bool = False
) -> None
Store a value with a TTL. If the key already exists, the value and TTL are overwritten (upsert). The record should expire after ttl seconds if not refreshed.
key
str
required
The key to store.
value
bytes
required
The value as bytes.
ttl
float
required
Time-to-live in seconds.
only_if_exists
bool
default:"False"
When True, the write is silently skipped if the key does not already exist.

mset

async def mset(items: list[tuple[str, bytes, float]]) -> None
Store multiple values with TTLs. Each item is a (key, value, ttl) tuple. Semantics per key are the same as set.
items
list[tuple[str, bytes, float]]
required
A list of (key, value, ttl) tuples.

get

async def get(key: str) -> bytes | None
Retrieve a value by key.
key
str
required
The key to retrieve.
Returns: bytes | None - The value as bytes, or None if the key does not exist or has expired.

mget

async def mget(keys: list[str]) -> list[bytes | None]
Retrieve multiple values by key. Returns a list of values in the same order as the input keys. Missing or expired keys are returned as None.
keys
list[str]
required
The keys to retrieve.
Returns: list[bytes | None] - A list of values (or None) in the same order as the input keys.

expire

async def expire(*keys: str, ttl: float) -> None
Refresh the TTL on one or more existing keys without changing their values. Keys that do not exist are silently ignored.
keys
str
required
One or more keys to update.
ttl
float
required
New time-to-live in seconds.

keys

async def keys(prefix: str) -> list[str]
Return all non-expired keys that start with prefix.
prefix
str
required
The key prefix to match.
Returns: list[str] - A list of matching key strings.

delete

async def delete(keys: list[str]) -> None
Delete one or more keys. Keys that do not exist are silently ignored.
keys
list[str]
required
The keys to delete.

Built-in Implementations

InMemorySessionKVStore

In-memory TTL key-value store suitable for single-node deployments, development, and testing.
InMemorySessionKVStore(*, cleanup_interval: float = 60.0)
cleanup_interval
float
default:"60.0"
Seconds between periodic expired-key sweeps.
Features:
  • Single-node only (not shared across processes)
  • Expired keys are cleaned up both lazily (on access) and periodically (via background task)
  • Lightweight and fast for local development
Example:
from vision_agents.core.agents.session_registry import InMemorySessionKVStore

store = InMemorySessionKVStore(cleanup_interval=30.0)
async with store:
    await store.set("key1", b"value1", ttl=60.0)
    result = await store.get("key1")

RedisSessionKVStore

Redis-backed TTL key-value store suitable for multi-node deployments where session state must be shared across processes or machines.
RedisSessionKVStore(
    *,
    client: redis.Redis | None = None,
    url: str | None = None,
    key_prefix: str = "vision_agents:"
)
client
redis.Redis | None
default:"None"
An existing redis.asyncio.Redis client. Caller owns the lifecycle. Mutually exclusive with url.
url
str | None
default:"None"
A Redis connection URL (e.g. redis://localhost:6379/0). The store creates and owns the client. Mutually exclusive with client.
key_prefix
str
default:"vision_agents:"
Prefix prepended to every key for namespacing.
Features:
  • Supports multi-node deployments
  • Shared session state across processes and machines
  • Uses Redis native TTL features (PX, PEXPIRE)
  • Non-blocking key scanning with SCAN
Example:
from vision_agents.core.agents.session_registry import RedisSessionKVStore
import redis.asyncio as redis

# Using URL (store owns the client)
store = RedisSessionKVStore(url="redis://localhost:6379/0")
async with store:
    await store.set("key1", b"value1", ttl=60.0)
    result = await store.get("key1")

# Using existing client (caller owns the client)
client = redis.from_url("redis://localhost:6379/0")
store = RedisSessionKVStore(client=client, key_prefix="my_app:")
async with store:
    await store.mset([("key1", b"val1", 60.0), ("key2", b"val2", 120.0)])
    results = await store.mget(["key1", "key2"])

Context Manager Support

All SessionKVStore implementations support async context manager protocol:
async with store:
    # Automatically calls start() on enter
    await store.set("key", b"value", 60.0)
    # Automatically calls close() on exit

Implementing a Custom Store

To implement a custom storage backend, subclass SessionKVStore and implement all abstract methods:
from vision_agents.core.agents.session_registry import SessionKVStore

class MyCustomStore(SessionKVStore):
    async def start(self) -> None:
        # Initialize your storage backend
        pass
    
    async def close(self) -> None:
        # Clean up resources
        pass
    
    async def set(
        self, key: str, value: bytes, ttl: float, *, only_if_exists: bool = False
    ) -> None:
        # Implement storage logic
        pass
    
    async def mset(self, items: list[tuple[str, bytes, float]]) -> None:
        # Implement batch storage logic
        pass
    
    async def expire(self, *keys: str, ttl: float) -> None:
        # Implement TTL refresh logic
        pass
    
    async def get(self, key: str) -> bytes | None:
        # Implement retrieval logic
        pass
    
    async def mget(self, keys: list[str]) -> list[bytes | None]:
        # Implement batch retrieval logic
        pass
    
    async def keys(self, prefix: str) -> list[str]:
        # Implement prefix search logic
        pass
    
    async def delete(self, keys: list[str]) -> None:
        # Implement deletion logic
        pass

Performance Considerations

  • InMemorySessionKVStore: Fastest for single-node, but not suitable for multi-node deployments
  • RedisSessionKVStore: Network overhead but enables multi-node coordination
  • Use mset and mget for batch operations to reduce round trips
  • The cleanup_interval in InMemorySessionKVStore affects memory usage vs. CPU usage tradeoff
  • Redis SCAN operations are non-blocking and safe for production use

Build docs developers (and LLMs) love