Documentation Index
Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt
Use this file to discover all available pages before exploring further.
Kura supports caching to avoid recomputing expensive operations like summarization. This guide shows you how to use caching effectively to improve performance and reduce API costs.
Why Use Caching?
Summarizing conversations with LLMs is often the most expensive part of the pipeline in terms of:
- API costs: Each conversation summary costs money
- Time: LLM calls can take seconds per conversation
- Rate limits: APIs have request limits
Caching allows you to:
- Reuse summaries when running the pipeline multiple times
- Resume from failures without reprocessing
- Experiment with different clustering parameters without re-summarizing
- Reduce costs by 90%+ on subsequent runs
Cache Strategy Base Class
Kura defines a simple interface for cache strategies (from kura/base_classes/cache.py:5-16):
from abc import ABC, abstractmethod
from typing import Any, Optional
class CacheStrategy(ABC):
"""Abstract base class for caching strategies."""
@abstractmethod
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from the cache by key."""
raise NotImplementedError("Subclasses must implement get method")
@abstractmethod
def set(self, key: str, value: Any) -> None:
"""Store a value in the cache with the given key."""
raise NotImplementedError("Subclasses must implement set method")
Using Disk Cache
Kura provides a disk-based cache implementation using diskcache.
Basic Setup
from kura.cache import DiskCacheStrategy
from kura.summarisation import SummaryModel
# Create cache directory
cache_strategy = DiskCacheStrategy(cache_dir="./cache")
# Initialize summary model with cache
summary_model = SummaryModel(
model="openai/gpt-4o-mini",
cache=cache_strategy
)
How It Works
The cache key is generated from:
- Conversation messages (role and content)
- Response schema
- Prompt (hashed)
- Temperature
- Model name
From kura/summarisation.py:191-212:
def _get_cache_key(
self,
conversation: Conversation,
response_schema: Type[T],
prompt: str,
temperature: float,
**kwargs,
) -> str:
"""Generate a cache key from conversation messages and parameters."""
# Create role-content pairs for each message
message_data = [(msg.role, msg.content) for msg in conversation.messages]
# Include all parameters that affect the output
cache_components = (
tuple(message_data),
response_schema.__name__,
hashlib.md5(prompt.encode()).hexdigest(),
temperature,
self.model,
)
return hashlib.md5(str(cache_components).encode()).hexdigest()
DiskCacheStrategy Implementation
Here’s the complete implementation (from kura/cache.py:8-27):
import os
import diskcache
from kura.base_classes.cache import CacheStrategy
from typing import Any, Optional
class DiskCacheStrategy(CacheStrategy):
"""Disk-based caching strategy using diskcache."""
def __init__(self, cache_dir: str):
"""
Initialize disk cache strategy.
Args:
cache_dir: Directory path for cache storage
"""
os.makedirs(cache_dir, exist_ok=True)
self.cache = diskcache.Cache(cache_dir)
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from the disk cache."""
return self.cache.get(key)
def set(self, key: str, value: Any) -> None:
"""Store a value in the disk cache."""
self.cache.set(key, value)
Using Cache in Your Pipeline
from kura.cache import DiskCacheStrategy
cache_strategy = DiskCacheStrategy("./cache")
Initialize Model with Cache
from kura.summarisation import SummaryModel
summary_model = SummaryModel(
model="openai/gpt-4o-mini",
max_concurrent_requests=50,
cache=cache_strategy # Enable caching
)
from kura.v1 import summarise_conversations, CheckpointManager
from kura.types import Conversation
# Load conversations
conversations = Conversation.from_hf_dataset(
"ivanleomk/synthetic-gemini-conversations",
max_conversations=1000
)
# Summarize with caching
checkpoint_mgr = CheckpointManager("./checkpoints")
summaries = await summarise_conversations(
conversations,
model=summary_model,
checkpoint_manager=checkpoint_mgr
)
On the first run, you’ll see:
INFO: Starting summarization of 1000 conversations
INFO: Generated 1000 raw summaries
On subsequent runs with the same data:
INFO: Starting summarization of 1000 conversations
DEBUG: Found cached summary for conversation abc123
DEBUG: Found cached summary for conversation def456
...
INFO: Generated 1000 raw summaries # Much faster!
Cache Invalidation
The cache is automatically invalidated when:
- Conversation content changes
- Prompt changes
- Temperature changes
- Response schema changes
- Model changes
This means you can safely experiment with different parameters without worrying about stale cache entries.
Custom Cache Strategies
You can implement custom cache strategies for different backends.
Redis Cache Example
from kura.base_classes.cache import CacheStrategy
import redis
import pickle
from typing import Any, Optional
class RedisCacheStrategy(CacheStrategy):
"""Redis-based caching strategy for distributed systems."""
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
self.redis_client = redis.Redis(host=host, port=port, db=db)
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from Redis cache."""
value = self.redis_client.get(key)
if value is None:
return None
return pickle.loads(value)
def set(self, key: str, value: Any) -> None:
"""Store a value in Redis cache."""
serialized = pickle.dumps(value)
# Store with 7-day expiration
self.redis_client.setex(key, 604800, serialized)
Usage:
redis_cache = RedisCacheStrategy(host="localhost", port=6379)
summary_model = SummaryModel(cache=redis_cache)
Memory Cache Example
For testing or small datasets:
from kura.base_classes.cache import CacheStrategy
from typing import Any, Optional
class MemoryCacheStrategy(CacheStrategy):
"""In-memory caching strategy (not persistent)."""
def __init__(self):
self._cache = {}
def get(self, key: str) -> Optional[Any]:
return self._cache.get(key)
def set(self, key: str, value: Any) -> None:
self._cache[key] = value
def clear(self):
"""Clear all cached items."""
self._cache.clear()
S3 Cache Example
For cloud storage:
from kura.base_classes.cache import CacheStrategy
import boto3
import pickle
import hashlib
from typing import Any, Optional
class S3CacheStrategy(CacheStrategy):
"""S3-based caching strategy for cloud deployments."""
def __init__(self, bucket_name: str, prefix: str = "kura-cache/"):
self.s3_client = boto3.client('s3')
self.bucket_name = bucket_name
self.prefix = prefix
def _get_s3_key(self, key: str) -> str:
"""Convert cache key to S3 object key."""
return f"{self.prefix}{key}"
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from S3 cache."""
try:
s3_key = self._get_s3_key(key)
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=s3_key
)
data = response['Body'].read()
return pickle.loads(data)
except self.s3_client.exceptions.NoSuchKey:
return None
def set(self, key: str, value: Any) -> None:
"""Store a value in S3 cache."""
s3_key = self._get_s3_key(key)
serialized = pickle.dumps(value)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=serialized
)
Cache vs Checkpoints
Understand the difference between caching and checkpointing:
| Feature | Cache | Checkpoints |
|---|
| Purpose | Avoid recomputing individual summaries | Save pipeline stage outputs |
| Granularity | Per conversation | Per stage (all summaries, all clusters) |
| Invalidation | Automatic on parameter change | Manual (delete file) |
| Location | ./cache/ | ./checkpoints/ |
| Use case | Resuming failed runs, experimenting | Persisting results, sharing outputs |
Best practice: Use both together:
from kura.cache import DiskCacheStrategy
from kura.v1 import CheckpointManager
# Cache individual summaries
cache = DiskCacheStrategy("./cache")
summary_model = SummaryModel(cache=cache)
# Checkpoint stage outputs
checkpoint_mgr = CheckpointManager("./checkpoints")
# Run pipeline with both
summaries = await summarise_conversations(
conversations,
model=summary_model,
checkpoint_manager=checkpoint_mgr
)
Caching can dramatically improve performance:
First Run (No Cache)
1000 conversations @ 2s each = 33 minutes
Cost: $10 in API calls
Second Run (With Cache)
1000 conversations @ 0.001s each = 1 second
Cost: $0
Partial Cache Hit
1000 conversations:
- 800 cached: 0.8 seconds
- 200 new: 400 seconds
Total: ~7 minutes (79% faster)
Cost: $2 (80% savings)
Managing Cache Size
The disk cache can grow large over time. Monitor and manage it:
Check Cache Size
from kura.cache import DiskCacheStrategy
cache = DiskCacheStrategy("./cache")
print(f"Cache size: {cache.cache.volume()} bytes")
print(f"Cache items: {len(cache.cache)}")
Clear Cache
# Clear all cache entries
cache.cache.clear()
# Or delete specific entries
for key in cache.cache.iterkeys():
if some_condition(key):
del cache.cache[key]
Set Cache Limits
import diskcache
# Limit cache to 1GB
cache = diskcache.Cache(
"./cache",
size_limit=1024**3 # 1GB in bytes
)
Best Practices
Use Separate Cache Directories
For different experiments or models:
cache_v1 = DiskCacheStrategy("./cache/experiment-1")
cache_v2 = DiskCacheStrategy("./cache/experiment-2")
Cache Only Expensive Operations
Don’t cache everything - focus on expensive operations:
✅ LLM summarization calls
✅ Embedding generation (if using paid APIs)
❌ Clustering (fast and cheap)
❌ Dimensionality reduction (fast)
Track how often cache is used:
import logging
logging.basicConfig(level=logging.DEBUG)
# Look for "Found cached summary" messages
Clear Cache When Prompts Change
If you modify your summarization prompt:
cache.cache.clear()
print("Cache cleared after prompt change")
Troubleshooting
Cache Not Working
If summaries aren’t being cached:
- Verify cache is passed to model:
print(summary_model.cache) # Should not be None
- Check logging:
import logging
logging.basicConfig(level=logging.DEBUG)
# Look for cache-related messages
- Ensure cache directory is writable:
Cache Permission Errors
# Fix cache directory permissions
chmod -R 755 ./cache
Cache Corruption
If you encounter cache errors:
# Delete and recreate cache
import shutil
shutil.rmtree("./cache")
cache = DiskCacheStrategy("./cache")
Next Steps