Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/alex-ber/AlexBerUtils/llms.txt

Use this file to discover all available pages before exploring further.

The cache module provides AsyncCache — a thread-safe, async-native cache — and the @async_cache decorator that wraps any coroutine function with automatic caching.

Installation

pip install alex-ber-utils

async_cache decorator

The simplest way to add caching is with the @async_cache decorator.
from alexber.utils.cache import async_cache

@async_cache(maxsize=256, policy="LRU", ttl=60)
async def fetch_user(user_id: int) -> dict:
    # Expensive I/O call — only runs on a cache miss
    return await db.get_user(user_id)
Calling fetch_user(42) twice within 60 seconds returns the cached result on the second call.

Accessing the cache instance

The decorator attaches the underlying AsyncCache instance to the wrapped function:
stats = await fetch_user.cache_instance.get_stats()
print(stats)
# {
#   'hits': 5, 'misses': 1, 'hit_miss_ratio': '0.8333',
#   'avg_time': '0.0023 s', 'max_time': '0.0041 s',
#   'min_time': '0.0019 s', 'total_calls': 1,
#   'current_size': 1, 'max_size': 256, 'ttl_sec': '60.0000'
# }

Caching class methods

The decorator detects bound methods via inspect and keys the cache per instance using id(self), so two instances never share cached results:
from alexber.utils.cache import async_cache

class WeatherService:
    def __init__(self, city: str):
        self.city = city

    @async_cache(maxsize=128, policy="LFU")
    async def get_forecast(self, date: str) -> dict:
        return await weather_api.fetch(self.city, date)

service_a = WeatherService("London")
service_b = WeatherService("Paris")

# These are cached independently — different instance IDs
await service_a.get_forecast("2026-03-25")
await service_b.get_forecast("2026-03-25")
maxsize
int
Maximum number of entries to hold in the cache. When the cache is full, the least-recently-used (LRU) or least-frequently-used (LFU) entry is evicted. Required when using AsyncCache directly; optional in @async_cache.
policy
string
default:"LFU"
Eviction policy. Accepts "LRU" (Least Recently Used) or "LFU" (Least Frequently Used). LFU keeps the entries called most often; LRU keeps the entries called most recently.
ttl
float | None
default:"None"
Time-to-Live in seconds. After this many seconds an entry is treated as expired and the next access triggers a fresh call. None disables expiry.

AsyncCache class

Use AsyncCache directly when you need fine-grained control — for example, to share one cache across multiple functions or to call clear() programmatically.
import asyncio
from alexber.utils.cache import AsyncCache

cache = AsyncCache(maxsize=512, policy="LRU", ttl=300)

async def get_price(ticker: str) -> float:
    try:
        return await cache[ticker]
    except KeyError:
        start = ...
        price = await market_api.quote(ticker)
        exec_ns = ...  # measure elapsed nanoseconds
        await cache.update_profiling(exec_ns)
        await cache.__setitem__(ticker, price)
        return price

Constructor parameters

maxsize
int
required
Maximum number of entries the cache will hold before evicting.
policy
string
default:"LFU"
Eviction policy: "LRU" or "LFU".
ttl
float | None
default:"None"
Entry lifetime in seconds. None means entries never expire.

Methods

MethodDescription
await cache[key]Return cached value. Raises KeyError on miss or expiry.
await cache.__setitem__(key, value)Store a value. Sets expiry if TTL is configured.
await cache.update_profiling(exec_time_ns)Record an execution time sample (nanoseconds).
await cache.get_stats()Return a dict of hit/miss/timing statistics (see below).
await cache.clear()Evict all entries and reset all statistics.

Statistics returned by get_stats()

KeyTypeDescription
hitsintNumber of successful cache lookups.
missesintNumber of cache misses.
hit_miss_ratiostrhits / (hits + misses), formatted to 4 decimal places.
avg_timestrAverage execution time across all recorded calls.
max_timestrLongest recorded execution time.
min_timestrShortest recorded execution time.
total_callsintNumber of calls passed to update_profiling.
current_sizeintNumber of entries currently in the cache.
max_sizeintConfigured maxsize.
ttl_secstrConfigured TTL (or "None").

Eviction policies

LRU — Least Recently Used

Evicts the entry that was accessed least recently. Good when access patterns are temporal — recently requested data is likely to be requested again soon.

LFU — Least Frequently Used

Evicts the entry with the lowest access count. Good when some data is accessed far more often than the rest and should be protected from eviction.

Cache key internals

The cache automatically converts unhashable argument values (lists, dicts, custom objects) into stable cache keys using two helpers from mains.py.

make_hashable(obj)

Recursively converts an object to a hashable type:
  • Mappingsfrozenset of (key, value) pairs
  • Iterables (excluding str / bytes) → tuple
  • Natively hashable objects → returned as-is
  • Everything else → wrapped in HashableWrapper
from alexber.utils.mains import make_hashable

make_hashable({"a": [1, 2], "b": 3})
# frozenset({('a', (1, 2)), ('b', 3)})

make_hashable([1, {"x": 2}])
# (1, frozenset({('x', 2)}))

HashableWrapper

A fallback wrapper for objects that are inherently non-hashable. It derives its hash from str(obj) and compares by identity of the wrapped value.

is_iterable(value) and is_mapping(value)

Two guard utilities used by make_hashable internally:
from alexber.utils.mains import is_iterable, is_mapping

is_iterable([1, 2, 3])   # True
is_iterable("hello")     # False  — str is excluded
is_iterable(None)        # False

is_mapping({"a": 1})     # True
is_mapping(["a", 1])     # False
is_iterable explicitly excludes str and bytes so that string arguments are treated as atomic cache key components rather than sequences of characters.

Full end-to-end example

1

Install the package

pip install alex-ber-utils
2

Decorate your async function

import asyncio
from alexber.utils.cache import async_cache

@async_cache(maxsize=100, policy="LFU", ttl=30)
async def compute(x: int, y: int) -> int:
    await asyncio.sleep(0.1)  # simulate work
    return x + y
3

Call the function and inspect statistics

async def main():
    print(await compute(1, 2))  # miss — runs the function
    print(await compute(1, 2))  # hit  — returns cached result
    print(await compute(3, 4))  # miss

    stats = await compute.cache_instance.get_stats()
    print(stats['hits'])         # 1
    print(stats['misses'])       # 2
    print(stats['hit_miss_ratio'])  # '0.3333'

asyncio.run(main())
4

Clear the cache when needed

await compute.cache_instance.clear()
Use policy="LFU" (the default) when a small subset of arguments dominates your call traffic. Switch to policy="LRU" when you expect a sliding window of recent queries with little repetition.

Build docs developers (and LLMs) love