Documentation Index
Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt
Use this file to discover all available pages before exploring further.
Kura’s architecture is designed for extensibility. You can create custom implementations of any component by implementing the appropriate base class. This guide shows you how to create custom models for embedding, summarization, and clustering.
Base Classes Overview
Kura provides base classes for all major components:
from kura.base_classes import (
BaseEmbeddingModel,
BaseSummaryModel,
BaseClusteringMethod,
BaseClusterDescriptionModel,
BaseMetaClusterModel,
BaseDimensionalityReduction,
BaseCheckpointManager
)
Custom Embedding Models
Embedding models convert text to vector representations. Here’s how to create a custom embedding model:
Base Class Requirements
from kura.base_classes import BaseEmbeddingModel
from abc import ABC, abstractmethod
class BaseEmbeddingModel(ABC):
@abstractmethod
async def embed(self, texts: list[str]) -> list[list[float]]:
"""Embed a list of texts into a list of lists of floats"""
pass
@abstractmethod
def slug(self) -> str:
"""Return a unique identifier for the embedding model.
This is used to identify the embedding model in the checkpoint manager.
"""
pass
Example: OpenAI Embedding Model
Here’s how Kura implements the OpenAI embedding model (from kura/embedding.py:39-108):
from kura.base_classes import BaseEmbeddingModel
from openai import AsyncOpenAI
from asyncio import Semaphore, gather
from tenacity import retry, wait_fixed, stop_after_attempt
import logging
logger = logging.getLogger(__name__)
class OpenAIEmbeddingModel(BaseEmbeddingModel):
def __init__(
self,
model_name: str = "text-embedding-3-small",
model_batch_size: int = 50,
n_concurrent_jobs: int = 5,
):
self.client = AsyncOpenAI()
self.model_name = model_name
self._model_batch_size = model_batch_size
self._n_concurrent_jobs = n_concurrent_jobs
self._semaphore = Semaphore(n_concurrent_jobs)
logger.info(
f"Initialized OpenAIEmbeddingModel with model={model_name}, "
f"batch_size={model_batch_size}, concurrent_jobs={n_concurrent_jobs}"
)
def slug(self):
return f"openai:{self.model_name}-batchsize:{self._model_batch_size}-concurrent:{self._n_concurrent_jobs}"
@retry(wait=wait_fixed(3), stop=stop_after_attempt(3))
async def _embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Embed a single batch of texts."""
async with self._semaphore:
try:
logger.debug(f"Embedding batch of {len(texts)} texts")
resp = await self.client.embeddings.create(
input=texts, model=self.model_name
)
embeddings = [item.embedding for item in resp.data]
logger.debug(f"Successfully embedded {len(texts)} texts")
return embeddings
except Exception as e:
logger.error(f"Failed to embed batch: {e}")
raise
async def embed(self, texts: list[str]) -> list[list[float]]:
if not texts:
return []
# Create batches
batches = batch_texts(texts, self._model_batch_size)
# Process all batches concurrently
tasks = [self._embed_batch(batch) for batch in batches]
results_list_of_lists = await gather(*tasks)
# Flatten results
embeddings = []
for result_batch in results_list_of_lists:
embeddings.extend(result_batch)
return embeddings
For local embedding models (from kura/embedding.py:111-169):
from kura.base_classes import BaseEmbeddingModel
from sentence_transformers import SentenceTransformer
class SentenceTransformerEmbeddingModel(BaseEmbeddingModel):
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
model_batch_size: int = 128,
device: str = "cpu",
):
self.model = SentenceTransformer(model_name, device=device)
self.model_name = model_name
self._model_batch_size = model_batch_size
def slug(self) -> str:
return f"sentence-transformers:{self.model_name}-batchsize:{self._model_batch_size}"
async def embed(self, texts: list[str]) -> list[list[float]]:
if not texts:
return []
# Create batches
batches = batch_texts(texts, self._model_batch_size)
# Process batches
embeddings = []
for batch in batches:
batch_embeddings = self.model.encode(batch).tolist()
embeddings.extend(batch_embeddings)
return embeddings
Custom Summarization Models
Summarization models convert conversations into structured summaries.
Base Class Requirements
from kura.base_classes import BaseSummaryModel
from kura.types import Conversation, ConversationSummary, GeneratedSummary
from typing import Type, TypeVar
T = TypeVar("T", bound=GeneratedSummary)
class BaseSummaryModel:
@abstractmethod
async def summarise(
self,
conversations: list[Conversation],
prompt: str,
*,
response_schema: Type[T] = GeneratedSummary,
temperature: float = 0.2,
**kwargs,
) -> list[ConversationSummary]:
"""Summarise conversations with configurable parameters."""
pass
@property
@abstractmethod
def checkpoint_filename(self) -> str:
"""Return the filename to use for checkpointing this model's output."""
pass
Example: Custom Summary Model with Caching
Here’s how to create a summary model with disk caching:
from kura.base_classes import BaseSummaryModel
from kura.cache import DiskCacheStrategy
import asyncio
class CustomSummaryModel(BaseSummaryModel):
def __init__(
self,
model: str = "openai/gpt-4o-mini",
max_concurrent_requests: int = 50,
cache_dir: str = "./cache"
):
self.model = model
self.max_concurrent_requests = max_concurrent_requests
self._checkpoint_filename = "custom_summaries"
# Initialize disk cache
self.cache = DiskCacheStrategy(cache_dir)
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
@property
def checkpoint_filename(self) -> str:
return self._checkpoint_filename
async def summarise(
self,
conversations: list[Conversation],
prompt: str,
*,
response_schema: Type[T] = GeneratedSummary,
temperature: float = 0.2,
**kwargs,
) -> list[ConversationSummary]:
import instructor
client = instructor.from_provider(self.model, async_client=True)
summaries = []
for conversation in conversations:
# Check cache first
cache_key = self._get_cache_key(conversation, prompt)
cached = self.cache.get(cache_key)
if cached:
summaries.append(cached)
continue
# Generate summary
async with self.semaphore:
resp = await client.chat.completions.create(
temperature=temperature,
messages=[{"role": "user", "content": prompt}],
context={"conversation": conversation},
response_model=response_schema,
)
summary = ConversationSummary(
chat_id=conversation.chat_id,
summary=resp.summary,
request=resp.request,
task=resp.task,
metadata=conversation.metadata
)
# Cache the result
self.cache.set(cache_key, summary)
summaries.append(summary)
return summaries
def _get_cache_key(self, conversation: Conversation, prompt: str) -> str:
import hashlib
message_data = [(msg.role, msg.content) for msg in conversation.messages]
return hashlib.md5(str((message_data, prompt)).encode()).hexdigest()
Custom Clustering Methods
Clustering methods group similar summaries together.
Base Class Requirements
from kura.base_classes import BaseClusteringMethod
from kura.types import ConversationSummary
class BaseClusteringMethod:
@abstractmethod
def cluster(
self,
items: list[dict[str, Union[ConversationSummary, list[float]]]]
) -> dict[int, list[ConversationSummary]]:
"""Cluster items based on their embeddings."""
pass
Example: K-means Clustering
From kura/cluster.py:325-395:
from kura.base_classes import BaseClusteringMethod
from sklearn.cluster import KMeans
import numpy as np
import math
class KmeansClusteringModel(BaseClusteringMethod):
def __init__(self, clusters_per_group: int = 10):
self.clusters_per_group = clusters_per_group
def cluster(
self,
items: list[dict[str, Union[ConversationSummary, list[float]]]]
) -> dict[int, list[ConversationSummary]]:
"""Perform K-means clustering on embedded items."""
if not items:
raise ValueError("Empty items list provided")
# Extract embeddings and data
embeddings = [item["embedding"] for item in items]
data = [item["item"] for item in items]
# Calculate number of clusters
n_clusters = math.ceil(len(data) / self.clusters_per_group)
# Perform K-means clustering
X = np.array(embeddings)
kmeans = KMeans(n_clusters=n_clusters)
cluster_labels = kmeans.fit_predict(X)
# Group items by cluster
result = {
i: [data[j] for j in range(len(data)) if cluster_labels[j] == i]
for i in range(n_clusters)
}
return result
Example: HDBSCAN Clustering
For density-based clustering:
from kura.base_classes import BaseClusteringMethod
import hdbscan
import numpy as np
class HDBSCANClusteringModel(BaseClusteringMethod):
def __init__(self, min_cluster_size: int = 5, min_samples: int = 3):
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
def cluster(
self,
items: list[dict[str, Union[ConversationSummary, list[float]]]]
) -> dict[int, list[ConversationSummary]]:
"""Perform HDBSCAN clustering."""
if not items:
raise ValueError("Empty items list provided")
embeddings = [item["embedding"] for item in items]
data = [item["item"] for item in items]
# Perform HDBSCAN clustering
X = np.array(embeddings)
clusterer = hdbscan.HDBSCAN(
min_cluster_size=self.min_cluster_size,
min_samples=self.min_samples
)
cluster_labels = clusterer.fit_predict(X)
# Group items by cluster (excluding noise points with label -1)
result = {}
for i in range(max(cluster_labels) + 1):
result[i] = [data[j] for j in range(len(data)) if cluster_labels[j] == i]
return result
Custom Cache Strategies
Cache strategies store and retrieve computed results.
Base Class Requirements
from kura.base_classes.cache import CacheStrategy
from typing import Any, Optional
class CacheStrategy:
@abstractmethod
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from the cache by key."""
pass
@abstractmethod
def set(self, key: str, value: Any) -> None:
"""Store a value in the cache with the given key."""
pass
Example: Disk Cache Strategy
From kura/cache.py:8-27:
from kura.base_classes.cache import CacheStrategy
import diskcache
import os
class DiskCacheStrategy(CacheStrategy):
"""Disk-based caching strategy using diskcache."""
def __init__(self, cache_dir: str):
"""
Initialize disk cache strategy.
Args:
cache_dir: Directory path for cache storage
"""
os.makedirs(cache_dir, exist_ok=True)
self.cache = diskcache.Cache(cache_dir)
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from the disk cache."""
return self.cache.get(key)
def set(self, key: str, value: Any) -> None:
"""Store a value in the disk cache."""
self.cache.set(key, value)
Example: Redis Cache Strategy
from kura.base_classes.cache import CacheStrategy
import redis
import pickle
class RedisCacheStrategy(CacheStrategy):
"""Redis-based caching strategy."""
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
self.redis_client = redis.Redis(host=host, port=port, db=db)
def get(self, key: str) -> Optional[Any]:
"""Retrieve a value from Redis cache."""
value = self.redis_client.get(key)
if value is None:
return None
return pickle.loads(value)
def set(self, key: str, value: Any) -> None:
"""Store a value in Redis cache."""
serialized = pickle.dumps(value)
self.redis_client.set(key, serialized)
Using Custom Models
Once you’ve created custom models, use them in your pipeline:
from kura.v1 import (
summarise_conversations,
generate_base_clusters_from_conversation_summaries,
CheckpointManager
)
# Initialize your custom models
embedding_model = SentenceTransformerEmbeddingModel(
model_name="all-mpnet-base-v2",
device="cuda"
)
summary_model = CustomSummaryModel(
model="openai/gpt-4o-mini",
cache_dir="./summary_cache"
)
clustering_method = HDBSCANClusteringModel(
min_cluster_size=10
)
# Run pipeline with custom models
checkpoint_mgr = CheckpointManager("./checkpoints")
summaries = await summarise_conversations(
conversations,
model=summary_model,
checkpoint_manager=checkpoint_mgr
)
clusters = await generate_base_clusters_from_conversation_summaries(
summaries,
embedding_model=embedding_model,
clustering_method=clustering_method,
checkpoint_manager=checkpoint_mgr
)
Best Practices
Use Python’s logging module to track model behavior:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Initialized model with config: {config}")
logger.debug(f"Processing batch of {len(items)} items")
Handle errors gracefully with retries and fallbacks:
from tenacity import retry, wait_fixed, stop_after_attempt
@retry(wait=wait_fixed(3), stop=stop_after_attempt(3))
async def embed_batch(self, texts):
try:
return await self._embed(texts)
except Exception as e:
logger.error(f"Failed to embed batch: {e}")
raise
Include configuration details in your model slug:
def slug(self) -> str:
return f"custom:{self.model_name}-batch:{self.batch_size}-temp:{self.temperature}"
Implement caching for expensive operations:
from kura.cache import DiskCacheStrategy
self.cache = DiskCacheStrategy("./cache")
Next Steps