Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt

Use this file to discover all available pages before exploring further.

Processing large datasets requires different strategies than small ones. This guide shows how to efficiently analyze 10,000+ conversations using checkpointing, batch processing, and multiple storage backends.

Overview

Key Strategies for Scale:
  • ✅ Use checkpointing to resume interrupted runs
  • ✅ Enable disk caching to avoid reprocessing
  • ✅ Use Parquet format for 50% space savings
  • ✅ Increase concurrent requests for faster processing
  • ✅ Use MiniBatch K-means for large datasets

Complete Large-Scale Example

large_scale_analysis.py
import asyncio
import time
from contextlib import contextmanager
from rich.console import Console

from kura.cache import DiskCacheStrategy
from kura.types import Conversation
from kura.summarisation import SummaryModel, summarise_conversations
from kura.k_means import MiniBatchKmeansClusteringMethod
from kura.cluster import (
    ClusterDescriptionModel,
    generate_base_clusters_from_conversation_summaries,
)
from kura.meta_cluster import MetaClusterModel, reduce_clusters_from_base_clusters
from kura.dimensionality import HDBUMAP, reduce_dimensionality_from_clusters
from kura.checkpoints import (
    HFDatasetCheckpointManager,
    JSONLCheckpointManager,
    ParquetCheckpointManager,
)
from kura.checkpoints import MultiCheckpointManager


class TimerManager:
    """Track timing for each pipeline step."""

    def __init__(self):
        self.timings = {}

    @contextmanager
    def timer(self, message):
        start_time = time.time()
        yield
        duration = time.time() - start_time
        self.timings[message] = duration
        print(f"{message} took {duration:.2f} seconds")

    def print_summary(self):
        print(f"\n{'=' * 60}")
        print(f"{'TIMING SUMMARY':^60}")
        print(f"{'=' * 60}")
        total_time = sum(self.timings.values())

        for operation, duration in self.timings.items():
            percentage = (duration / total_time * 100) if total_time > 0 else 0
            print(f"{operation:<40} {duration:>8.2f}s ({percentage:>5.1f}%)")

        print(f"{'-' * 60}")
        print(f"{'Total Time':<40} {total_time:>8.2f}s")
        print(f"{'=' * 60}\n")


async def main():
    console = Console()
    timer_manager = TimerManager()
    CHECKPOINT_DIR = "./large_scale_checkpoints"

    # Step 1: Configure models for high throughput
    summary_model = SummaryModel(
        console=console,
        max_concurrent_requests=100,  # Increased for speed
        cache=DiskCacheStrategy(cache_dir="./.summary_cache"),
    )

    # Use MiniBatch K-means for large-scale clustering
    minibatch_kmeans = MiniBatchKmeansClusteringMethod(
        clusters_per_group=10,  # Target items per cluster
        batch_size=1000,  # Process 1000 embeddings at a time
        max_iter=100,  # Iterations for convergence
        random_state=42,
    )

    cluster_model = ClusterDescriptionModel(
        console=console,
        clustering_method=minibatch_kmeans,
    )

    meta_cluster_model = MetaClusterModel(
        console=console,
        max_concurrent_requests=100,
    )

    dimensionality_model = HDBUMAP()

    # Step 2: Set up multi-format checkpointing
    # Saves to all formats simultaneously for redundancy and performance
    hf_manager = HFDatasetCheckpointManager(f"{CHECKPOINT_DIR}/hf", enabled=True)
    parquet_manager = ParquetCheckpointManager(
        f"{CHECKPOINT_DIR}/parquet", enabled=True
    )
    jsonl_manager = JSONLCheckpointManager(f"{CHECKPOINT_DIR}/jsonl", enabled=True)

    checkpoint_manager = MultiCheckpointManager(
        [hf_manager, parquet_manager, jsonl_manager],
        save_strategy="all_enabled",  # Save to all formats
        load_strategy="first_found",  # Load from fastest available
    )

    print(f"Using MultiCheckpointManager with {len(checkpoint_manager.managers)} backends:")
    for manager in checkpoint_manager.managers:
        print(f"  - {type(manager).__name__}: {manager.checkpoint_dir}")
    print()

    # Step 3: Load large dataset
    with timer_manager.timer("Loading conversations"):
        conversations = Conversation.from_hf_dataset(
            "ivanleomk/synthetic-gemini-conversations",
            split="train",
            max_conversations=10000,  # Scale to 10k conversations
        )

    console.print(f"[bold green]✓ Loaded {len(conversations)} conversations[/bold green]\n")

    # Step 4: Process pipeline with checkpoints
    console.print("[bold blue]Step 1: Generating summaries...[/bold blue]")
    with timer_manager.timer("Summarization"):
        summaries = await summarise_conversations(
            conversations,
            model=summary_model,
            checkpoint_manager=checkpoint_manager,
        )
    console.print(f"[bold green]✓ Generated {len(summaries)} summaries[/bold green]\n")

    console.print("[bold blue]Step 2: Clustering conversations...[/bold blue]")
    with timer_manager.timer("Clustering"):
        clusters = await generate_base_clusters_from_conversation_summaries(
            summaries,
            model=cluster_model,
            checkpoint_manager=checkpoint_manager,
        )
    console.print(f"[bold green]✓ Created {len(clusters)} clusters[/bold green]\n")

    console.print("[bold blue]Step 3: Meta clustering...[/bold blue]")
    with timer_manager.timer("Meta clustering"):
        reduced_clusters = await reduce_clusters_from_base_clusters(
            clusters,
            model=meta_cluster_model,
            checkpoint_manager=checkpoint_manager,
        )
    console.print(f"[bold green]✓ Reduced to {len(reduced_clusters)} meta clusters[/bold green]\n")

    console.print("[bold blue]Step 4: Dimensionality reduction...[/bold blue]")
    with timer_manager.timer("Dimensionality reduction"):
        projected_clusters = await reduce_dimensionality_from_clusters(
            reduced_clusters,
            model=dimensionality_model,
            checkpoint_manager=checkpoint_manager,
        )
    console.print(
        f"[bold green]✓ Projected {len(projected_clusters)} clusters[/bold green]\n"
    )

    # Step 5: Show statistics
    timer_manager.print_summary()

    # Show checkpoint statistics
    print("\nCheckpoint Statistics:")
    stats = checkpoint_manager.get_stats()
    for mgr_stat in stats["managers"]:
        print(f"  - {mgr_stat['type']}: {mgr_stat.get('checkpoint_count', 'N/A')} files")

    return summaries, clusters, reduced_clusters, projected_clusters


if __name__ == "__main__":
    results = asyncio.run(main())

Running the Example

# Install with all optional dependencies
uv pip install "kura[all]"

# Run the analysis
python large_scale_analysis.py

Performance Benchmarks

Dataset: 10,000 Conversations

First Run (No Cache):
Loading conversations                        12.45s (0.7%)
Summarization                             1,245.67s (70.2%)
Clustering                                  312.45s (17.6%)
Meta clustering                             156.23s (8.8%)
Dimensionality reduction                     48.92s (2.8%)
─────────────────────────────────────────────────────────
Total Time                                1,775.72s (29.6 minutes)
Second Run (With Cache):
Loading conversations                        12.34s (9.8%)
Summarization                                 2.45s (1.9%)  ← 99.8% faster!
Clustering                                   78.23s (62.1%)
Meta clustering                              26.45s (21.0%)
Dimensionality reduction                      6.78s (5.4%)
─────────────────────────────────────────────────────────
Total Time                                  126.25s (2.1 minutes)
14x speedup with caching! The second run takes only 2 minutes vs. 30 minutes.

Checkpoint Format Comparison

JSONLCheckpointManager (default)Pros:
  • Simple, human-readable format
  • No external dependencies
  • Easy to debug (can open in text editor)
  • Works everywhere
Cons:
  • Larger file sizes
  • Slower for large datasets
Use when: Getting started, debugging, or < 1k conversations
jsonl_manager = JSONLCheckpointManager("./checkpoints", enabled=True)

Storage Comparison

10,000 Conversation Dataset:
Checkpoint FileJSONLParquetSavings
summaries4.2 MB2.1 MB50%
clusters1.8 MB0.9 MB50%
meta_clusters0.6 MB0.3 MB50%
dimensionality0.4 MB0.2 MB50%
Total7.0 MB3.5 MB50%
Parquet format saves 50% disk space with faster read/write for large datasets!

Optimizing Concurrent Requests

The max_concurrent_requests parameter controls API parallelism:
# Conservative (default)
summary_model = SummaryModel(max_concurrent_requests=50)

# Balanced
summary_model = SummaryModel(max_concurrent_requests=100)

# Aggressive (if you have high rate limits)
summary_model = SummaryModel(max_concurrent_requests=200)
Benchmark (10k conversations):
ConcurrencyTimeThroughput
2545 min3.7 conv/s
5024 min6.9 conv/s
10014 min11.9 conv/s
20012 min13.9 conv/s
Check your API provider’s rate limits before increasing concurrency. Too high can cause throttling.

Resuming Interrupted Runs

Checkpoints automatically save after each pipeline step:
# Start processing 10k conversations
python large_scale_analysis.py

# Summarization: 100% ████████████ (30 min)
# Clustering: 45% ████████------ (interrupted!)

# Resume - skips completed steps
python large_scale_analysis.py

# Loaded 10,000 summaries from checkpoint  ← Instant!
# Clustering: 45% ████████------ (resumes)
Checkpoint locations:
large_scale_checkpoints/
├── hf/
│   ├── summaries/           # HF Dataset format
│   ├── clusters/
│   └── meta_clusters/
├── parquet/
│   ├── summaries.parquet    # Parquet format (50% smaller)
│   ├── clusters.parquet
│   └── meta_clusters.parquet
└── jsonl/
    ├── summaries.jsonl      # JSONL format (human-readable)
    ├── clusters.jsonl
    └── meta_clusters.jsonl

MiniBatch K-means for Scale

For 10,000+ conversations, use MiniBatch K-means instead of standard K-means:
from kura.k_means import MiniBatchKmeansClusteringMethod

minibatch_kmeans = MiniBatchKmeansClusteringMethod(
    clusters_per_group=10,   # Target conversations per cluster
    batch_size=1000,        # Process 1000 embeddings per batch
    max_iter=100,           # Maximum iterations
    random_state=42,        # Reproducibility
)

cluster_model = ClusterDescriptionModel(
    clustering_method=minibatch_kmeans
)
Benefits:
  • ✅ 3-5x faster than standard K-means
  • ✅ Constant memory usage (doesn’t load all embeddings)
  • ✅ Scales to millions of conversations
  • ✅ Similar quality to full K-means

Common Issues

Symptom: Script crashes with MemoryError during clustering.Solutions:
  1. Reduce batch_size in MiniBatchKmeansClusteringMethod
  2. Process data in smaller batches
  3. Use HFDatasetCheckpointManager for memory-efficient loading
# Reduce memory usage
minibatch_kmeans = MiniBatchKmeansClusteringMethod(
    batch_size=500,  # Lower batch size
)
Symptom: Many 429 errors, slow summarization.Solutions:
  1. Reduce max_concurrent_requests
  2. Use a provider with higher rate limits
  3. Process in smaller batches
# Reduce API concurrency
summary_model = SummaryModel(
    max_concurrent_requests=25,  # Lower concurrency
)
Symptom: Running out of disk space.Solutions:
  1. Use ParquetCheckpointManager (50% smaller)
  2. Disable JSONL checkpoints in MultiCheckpointManager
  3. Clean old cache directories
# Use only Parquet (smallest format)
checkpoint_manager = ParquetCheckpointManager("./checkpoints")

Next Steps

Custom Metadata

Extract sentiment, language, and custom properties

Comparing Models

Compare different LLM and clustering configurations

Web Interface

Explore results in an interactive dashboard

Streaming Data

Process datasets too large for memory

Build docs developers (and LLMs) love