Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt

Use this file to discover all available pages before exploring further.

Kura’s architecture is designed for extensibility. You can create custom implementations of any component by implementing the appropriate base class. This guide shows you how to create custom models for embedding, summarization, and clustering.

Base Classes Overview

Kura provides base classes for all major components:
from kura.base_classes import (
    BaseEmbeddingModel,
    BaseSummaryModel,
    BaseClusteringMethod,
    BaseClusterDescriptionModel,
    BaseMetaClusterModel,
    BaseDimensionalityReduction,
    BaseCheckpointManager
)

Custom Embedding Models

Embedding models convert text to vector representations. Here’s how to create a custom embedding model:

Base Class Requirements

from kura.base_classes import BaseEmbeddingModel
from abc import ABC, abstractmethod

class BaseEmbeddingModel(ABC):
    @abstractmethod
    async def embed(self, texts: list[str]) -> list[list[float]]:
        """Embed a list of texts into a list of lists of floats"""
        pass

    @abstractmethod
    def slug(self) -> str:
        """Return a unique identifier for the embedding model.
        This is used to identify the embedding model in the checkpoint manager.
        """
        pass

Example: OpenAI Embedding Model

Here’s how Kura implements the OpenAI embedding model (from kura/embedding.py:39-108):
from kura.base_classes import BaseEmbeddingModel
from openai import AsyncOpenAI
from asyncio import Semaphore, gather
from tenacity import retry, wait_fixed, stop_after_attempt
import logging

logger = logging.getLogger(__name__)

class OpenAIEmbeddingModel(BaseEmbeddingModel):
    def __init__(
        self,
        model_name: str = "text-embedding-3-small",
        model_batch_size: int = 50,
        n_concurrent_jobs: int = 5,
    ):
        self.client = AsyncOpenAI()
        self.model_name = model_name
        self._model_batch_size = model_batch_size
        self._n_concurrent_jobs = n_concurrent_jobs
        self._semaphore = Semaphore(n_concurrent_jobs)
        logger.info(
            f"Initialized OpenAIEmbeddingModel with model={model_name}, "
            f"batch_size={model_batch_size}, concurrent_jobs={n_concurrent_jobs}"
        )

    def slug(self):
        return f"openai:{self.model_name}-batchsize:{self._model_batch_size}-concurrent:{self._n_concurrent_jobs}"

    @retry(wait=wait_fixed(3), stop=stop_after_attempt(3))
    async def _embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Embed a single batch of texts."""
        async with self._semaphore:
            try:
                logger.debug(f"Embedding batch of {len(texts)} texts")
                resp = await self.client.embeddings.create(
                    input=texts, model=self.model_name
                )
                embeddings = [item.embedding for item in resp.data]
                logger.debug(f"Successfully embedded {len(texts)} texts")
                return embeddings
            except Exception as e:
                logger.error(f"Failed to embed batch: {e}")
                raise

    async def embed(self, texts: list[str]) -> list[list[float]]:
        if not texts:
            return []

        # Create batches
        batches = batch_texts(texts, self._model_batch_size)
        
        # Process all batches concurrently
        tasks = [self._embed_batch(batch) for batch in batches]
        results_list_of_lists = await gather(*tasks)
        
        # Flatten results
        embeddings = []
        for result_batch in results_list_of_lists:
            embeddings.extend(result_batch)
        
        return embeddings

Example: SentenceTransformer Model

For local embedding models (from kura/embedding.py:111-169):
from kura.base_classes import BaseEmbeddingModel
from sentence_transformers import SentenceTransformer

class SentenceTransformerEmbeddingModel(BaseEmbeddingModel):
    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        model_batch_size: int = 128,
        device: str = "cpu",
    ):
        self.model = SentenceTransformer(model_name, device=device)
        self.model_name = model_name
        self._model_batch_size = model_batch_size

    def slug(self) -> str:
        return f"sentence-transformers:{self.model_name}-batchsize:{self._model_batch_size}"

    async def embed(self, texts: list[str]) -> list[list[float]]:
        if not texts:
            return []

        # Create batches
        batches = batch_texts(texts, self._model_batch_size)
        
        # Process batches
        embeddings = []
        for batch in batches:
            batch_embeddings = self.model.encode(batch).tolist()
            embeddings.extend(batch_embeddings)
        
        return embeddings

Custom Summarization Models

Summarization models convert conversations into structured summaries.

Base Class Requirements

from kura.base_classes import BaseSummaryModel
from kura.types import Conversation, ConversationSummary, GeneratedSummary
from typing import Type, TypeVar

T = TypeVar("T", bound=GeneratedSummary)

class BaseSummaryModel:
    @abstractmethod
    async def summarise(
        self,
        conversations: list[Conversation],
        prompt: str,
        *,
        response_schema: Type[T] = GeneratedSummary,
        temperature: float = 0.2,
        **kwargs,
    ) -> list[ConversationSummary]:
        """Summarise conversations with configurable parameters."""
        pass

    @property
    @abstractmethod
    def checkpoint_filename(self) -> str:
        """Return the filename to use for checkpointing this model's output."""
        pass

Example: Custom Summary Model with Caching

Here’s how to create a summary model with disk caching:
from kura.base_classes import BaseSummaryModel
from kura.cache import DiskCacheStrategy
import asyncio

class CustomSummaryModel(BaseSummaryModel):
    def __init__(
        self,
        model: str = "openai/gpt-4o-mini",
        max_concurrent_requests: int = 50,
        cache_dir: str = "./cache"
    ):
        self.model = model
        self.max_concurrent_requests = max_concurrent_requests
        self._checkpoint_filename = "custom_summaries"
        
        # Initialize disk cache
        self.cache = DiskCacheStrategy(cache_dir)
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)

    @property
    def checkpoint_filename(self) -> str:
        return self._checkpoint_filename

    async def summarise(
        self,
        conversations: list[Conversation],
        prompt: str,
        *,
        response_schema: Type[T] = GeneratedSummary,
        temperature: float = 0.2,
        **kwargs,
    ) -> list[ConversationSummary]:
        import instructor
        
        client = instructor.from_provider(self.model, async_client=True)
        
        summaries = []
        for conversation in conversations:
            # Check cache first
            cache_key = self._get_cache_key(conversation, prompt)
            cached = self.cache.get(cache_key)
            
            if cached:
                summaries.append(cached)
                continue
            
            # Generate summary
            async with self.semaphore:
                resp = await client.chat.completions.create(
                    temperature=temperature,
                    messages=[{"role": "user", "content": prompt}],
                    context={"conversation": conversation},
                    response_model=response_schema,
                )
                
                summary = ConversationSummary(
                    chat_id=conversation.chat_id,
                    summary=resp.summary,
                    request=resp.request,
                    task=resp.task,
                    metadata=conversation.metadata
                )
                
                # Cache the result
                self.cache.set(cache_key, summary)
                summaries.append(summary)
        
        return summaries
    
    def _get_cache_key(self, conversation: Conversation, prompt: str) -> str:
        import hashlib
        message_data = [(msg.role, msg.content) for msg in conversation.messages]
        return hashlib.md5(str((message_data, prompt)).encode()).hexdigest()

Custom Clustering Methods

Clustering methods group similar summaries together.

Base Class Requirements

from kura.base_classes import BaseClusteringMethod
from kura.types import ConversationSummary

class BaseClusteringMethod:
    @abstractmethod
    def cluster(
        self, 
        items: list[dict[str, Union[ConversationSummary, list[float]]]]
    ) -> dict[int, list[ConversationSummary]]:
        """Cluster items based on their embeddings."""
        pass

Example: K-means Clustering

From kura/cluster.py:325-395:
from kura.base_classes import BaseClusteringMethod
from sklearn.cluster import KMeans
import numpy as np
import math

class KmeansClusteringModel(BaseClusteringMethod):
    def __init__(self, clusters_per_group: int = 10):
        self.clusters_per_group = clusters_per_group

    def cluster(
        self, 
        items: list[dict[str, Union[ConversationSummary, list[float]]]]
    ) -> dict[int, list[ConversationSummary]]:
        """Perform K-means clustering on embedded items."""
        if not items:
            raise ValueError("Empty items list provided")

        # Extract embeddings and data
        embeddings = [item["embedding"] for item in items]
        data = [item["item"] for item in items]
        
        # Calculate number of clusters
        n_clusters = math.ceil(len(data) / self.clusters_per_group)
        
        # Perform K-means clustering
        X = np.array(embeddings)
        kmeans = KMeans(n_clusters=n_clusters)
        cluster_labels = kmeans.fit_predict(X)
        
        # Group items by cluster
        result = {
            i: [data[j] for j in range(len(data)) if cluster_labels[j] == i]
            for i in range(n_clusters)
        }
        
        return result

Example: HDBSCAN Clustering

For density-based clustering:
from kura.base_classes import BaseClusteringMethod
import hdbscan
import numpy as np

class HDBSCANClusteringModel(BaseClusteringMethod):
    def __init__(self, min_cluster_size: int = 5, min_samples: int = 3):
        self.min_cluster_size = min_cluster_size
        self.min_samples = min_samples

    def cluster(
        self,
        items: list[dict[str, Union[ConversationSummary, list[float]]]]
    ) -> dict[int, list[ConversationSummary]]:
        """Perform HDBSCAN clustering."""
        if not items:
            raise ValueError("Empty items list provided")

        embeddings = [item["embedding"] for item in items]
        data = [item["item"] for item in items]
        
        # Perform HDBSCAN clustering
        X = np.array(embeddings)
        clusterer = hdbscan.HDBSCAN(
            min_cluster_size=self.min_cluster_size,
            min_samples=self.min_samples
        )
        cluster_labels = clusterer.fit_predict(X)
        
        # Group items by cluster (excluding noise points with label -1)
        result = {}
        for i in range(max(cluster_labels) + 1):
            result[i] = [data[j] for j in range(len(data)) if cluster_labels[j] == i]
        
        return result

Custom Cache Strategies

Cache strategies store and retrieve computed results.

Base Class Requirements

from kura.base_classes.cache import CacheStrategy
from typing import Any, Optional

class CacheStrategy:
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from the cache by key."""
        pass
    
    @abstractmethod
    def set(self, key: str, value: Any) -> None:
        """Store a value in the cache with the given key."""
        pass

Example: Disk Cache Strategy

From kura/cache.py:8-27:
from kura.base_classes.cache import CacheStrategy
import diskcache
import os

class DiskCacheStrategy(CacheStrategy):
    """Disk-based caching strategy using diskcache."""
    
    def __init__(self, cache_dir: str):
        """
        Initialize disk cache strategy.
        
        Args:
            cache_dir: Directory path for cache storage
        """
        os.makedirs(cache_dir, exist_ok=True)
        self.cache = diskcache.Cache(cache_dir)
    
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from the disk cache."""
        return self.cache.get(key)
    
    def set(self, key: str, value: Any) -> None:
        """Store a value in the disk cache."""
        self.cache.set(key, value)

Example: Redis Cache Strategy

from kura.base_classes.cache import CacheStrategy
import redis
import pickle

class RedisCacheStrategy(CacheStrategy):
    """Redis-based caching strategy."""
    
    def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
        self.redis_client = redis.Redis(host=host, port=port, db=db)
    
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from Redis cache."""
        value = self.redis_client.get(key)
        if value is None:
            return None
        return pickle.loads(value)
    
    def set(self, key: str, value: Any) -> None:
        """Store a value in Redis cache."""
        serialized = pickle.dumps(value)
        self.redis_client.set(key, serialized)

Using Custom Models

Once you’ve created custom models, use them in your pipeline:
from kura.v1 import (
    summarise_conversations,
    generate_base_clusters_from_conversation_summaries,
    CheckpointManager
)

# Initialize your custom models
embedding_model = SentenceTransformerEmbeddingModel(
    model_name="all-mpnet-base-v2",
    device="cuda"
)

summary_model = CustomSummaryModel(
    model="openai/gpt-4o-mini",
    cache_dir="./summary_cache"
)

clustering_method = HDBSCANClusteringModel(
    min_cluster_size=10
)

# Run pipeline with custom models
checkpoint_mgr = CheckpointManager("./checkpoints")

summaries = await summarise_conversations(
    conversations,
    model=summary_model,
    checkpoint_manager=checkpoint_mgr
)

clusters = await generate_base_clusters_from_conversation_summaries(
    summaries,
    embedding_model=embedding_model,
    clustering_method=clustering_method,
    checkpoint_manager=checkpoint_mgr
)

Best Practices

1
Implement Proper Logging
2
Use Python’s logging module to track model behavior:
3
import logging

logger = logging.getLogger(__name__)
logger.info(f"Initialized model with config: {config}")
logger.debug(f"Processing batch of {len(items)} items")
4
Add Error Handling
5
Handle errors gracefully with retries and fallbacks:
6
from tenacity import retry, wait_fixed, stop_after_attempt

@retry(wait=wait_fixed(3), stop=stop_after_attempt(3))
async def embed_batch(self, texts):
    try:
        return await self._embed(texts)
    except Exception as e:
        logger.error(f"Failed to embed batch: {e}")
        raise
7
Use Descriptive Slugs
8
Include configuration details in your model slug:
9
def slug(self) -> str:
    return f"custom:{self.model_name}-batch:{self.batch_size}-temp:{self.temperature}"
10
Support Caching
11
Implement caching for expensive operations:
12
from kura.cache import DiskCacheStrategy

self.cache = DiskCacheStrategy("./cache")

Next Steps

Build docs developers (and LLMs) love