Documentation Index Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt
Use this file to discover all available pages before exploring further.
Processing large datasets requires different strategies than small ones. This guide shows how to efficiently analyze 10,000+ conversations using checkpointing, batch processing, and multiple storage backends.
Overview
Key Strategies for Scale :
✅ Use checkpointing to resume interrupted runs
✅ Enable disk caching to avoid reprocessing
✅ Use Parquet format for 50% space savings
✅ Increase concurrent requests for faster processing
✅ Use MiniBatch K-means for large datasets
Complete Large-Scale Example
import asyncio
import time
from contextlib import contextmanager
from rich.console import Console
from kura.cache import DiskCacheStrategy
from kura.types import Conversation
from kura.summarisation import SummaryModel, summarise_conversations
from kura.k_means import MiniBatchKmeansClusteringMethod
from kura.cluster import (
ClusterDescriptionModel,
generate_base_clusters_from_conversation_summaries,
)
from kura.meta_cluster import MetaClusterModel, reduce_clusters_from_base_clusters
from kura.dimensionality import HDBUMAP , reduce_dimensionality_from_clusters
from kura.checkpoints import (
HFDatasetCheckpointManager,
JSONLCheckpointManager,
ParquetCheckpointManager,
)
from kura.checkpoints import MultiCheckpointManager
class TimerManager :
"""Track timing for each pipeline step."""
def __init__ ( self ):
self .timings = {}
@contextmanager
def timer ( self , message ):
start_time = time.time()
yield
duration = time.time() - start_time
self .timings[message] = duration
print ( f " { message } took { duration :.2f} seconds" )
def print_summary ( self ):
print ( f " \n { '=' * 60 } " )
print ( f " { 'TIMING SUMMARY' :^60} " )
print ( f " { '=' * 60 } " )
total_time = sum ( self .timings.values())
for operation, duration in self .timings.items():
percentage = (duration / total_time * 100 ) if total_time > 0 else 0
print ( f " { operation :<40} { duration :>8.2f} s ( { percentage :>5.1f} %)" )
print ( f " { '-' * 60 } " )
print ( f " { 'Total Time' :<40} { total_time :>8.2f} s" )
print ( f " { '=' * 60 } \n " )
async def main ():
console = Console()
timer_manager = TimerManager()
CHECKPOINT_DIR = "./large_scale_checkpoints"
# Step 1: Configure models for high throughput
summary_model = SummaryModel(
console = console,
max_concurrent_requests = 100 , # Increased for speed
cache = DiskCacheStrategy( cache_dir = "./.summary_cache" ),
)
# Use MiniBatch K-means for large-scale clustering
minibatch_kmeans = MiniBatchKmeansClusteringMethod(
clusters_per_group = 10 , # Target items per cluster
batch_size = 1000 , # Process 1000 embeddings at a time
max_iter = 100 , # Iterations for convergence
random_state = 42 ,
)
cluster_model = ClusterDescriptionModel(
console = console,
clustering_method = minibatch_kmeans,
)
meta_cluster_model = MetaClusterModel(
console = console,
max_concurrent_requests = 100 ,
)
dimensionality_model = HDBUMAP()
# Step 2: Set up multi-format checkpointing
# Saves to all formats simultaneously for redundancy and performance
hf_manager = HFDatasetCheckpointManager( f " { CHECKPOINT_DIR } /hf" , enabled = True )
parquet_manager = ParquetCheckpointManager(
f " { CHECKPOINT_DIR } /parquet" , enabled = True
)
jsonl_manager = JSONLCheckpointManager( f " { CHECKPOINT_DIR } /jsonl" , enabled = True )
checkpoint_manager = MultiCheckpointManager(
[hf_manager, parquet_manager, jsonl_manager],
save_strategy = "all_enabled" , # Save to all formats
load_strategy = "first_found" , # Load from fastest available
)
print ( f "Using MultiCheckpointManager with { len (checkpoint_manager.managers) } backends:" )
for manager in checkpoint_manager.managers:
print ( f " - { type (manager). __name__ } : { manager.checkpoint_dir } " )
print ()
# Step 3: Load large dataset
with timer_manager.timer( "Loading conversations" ):
conversations = Conversation.from_hf_dataset(
"ivanleomk/synthetic-gemini-conversations" ,
split = "train" ,
max_conversations = 10000 , # Scale to 10k conversations
)
console.print( f "[bold green]✓ Loaded { len (conversations) } conversations[/bold green] \n " )
# Step 4: Process pipeline with checkpoints
console.print( "[bold blue]Step 1: Generating summaries...[/bold blue]" )
with timer_manager.timer( "Summarization" ):
summaries = await summarise_conversations(
conversations,
model = summary_model,
checkpoint_manager = checkpoint_manager,
)
console.print( f "[bold green]✓ Generated { len (summaries) } summaries[/bold green] \n " )
console.print( "[bold blue]Step 2: Clustering conversations...[/bold blue]" )
with timer_manager.timer( "Clustering" ):
clusters = await generate_base_clusters_from_conversation_summaries(
summaries,
model = cluster_model,
checkpoint_manager = checkpoint_manager,
)
console.print( f "[bold green]✓ Created { len (clusters) } clusters[/bold green] \n " )
console.print( "[bold blue]Step 3: Meta clustering...[/bold blue]" )
with timer_manager.timer( "Meta clustering" ):
reduced_clusters = await reduce_clusters_from_base_clusters(
clusters,
model = meta_cluster_model,
checkpoint_manager = checkpoint_manager,
)
console.print( f "[bold green]✓ Reduced to { len (reduced_clusters) } meta clusters[/bold green] \n " )
console.print( "[bold blue]Step 4: Dimensionality reduction...[/bold blue]" )
with timer_manager.timer( "Dimensionality reduction" ):
projected_clusters = await reduce_dimensionality_from_clusters(
reduced_clusters,
model = dimensionality_model,
checkpoint_manager = checkpoint_manager,
)
console.print(
f "[bold green]✓ Projected { len (projected_clusters) } clusters[/bold green] \n "
)
# Step 5: Show statistics
timer_manager.print_summary()
# Show checkpoint statistics
print ( " \n Checkpoint Statistics:" )
stats = checkpoint_manager.get_stats()
for mgr_stat in stats[ "managers" ]:
print ( f " - { mgr_stat[ 'type' ] } : { mgr_stat.get( 'checkpoint_count' , 'N/A' ) } files" )
return summaries, clusters, reduced_clusters, projected_clusters
if __name__ == "__main__" :
results = asyncio.run(main())
Running the Example
# Install with all optional dependencies
uv pip install "kura[all]"
# Run the analysis
python large_scale_analysis.py
Dataset: 10,000 Conversations
First Run (No Cache) :
Loading conversations 12.45s (0.7%)
Summarization 1,245.67s (70.2%)
Clustering 312.45s (17.6%)
Meta clustering 156.23s (8.8%)
Dimensionality reduction 48.92s (2.8%)
─────────────────────────────────────────────────────────
Total Time 1,775.72s (29.6 minutes)
Second Run (With Cache) :
Loading conversations 12.34s (9.8%)
Summarization 2.45s (1.9%) ← 99.8% faster!
Clustering 78.23s (62.1%)
Meta clustering 26.45s (21.0%)
Dimensionality reduction 6.78s (5.4%)
─────────────────────────────────────────────────────────
Total Time 126.25s (2.1 minutes)
14x speedup with caching! The second run takes only 2 minutes vs. 30 minutes.
JSONL
Parquet
HuggingFace Datasets
Multi (All Formats)
JSONLCheckpointManager (default)✅ Pros :
Simple, human-readable format
No external dependencies
Easy to debug (can open in text editor)
Works everywhere
❌ Cons :
Larger file sizes
Slower for large datasets
Use when : Getting started, debugging, or < 1k conversationsjsonl_manager = JSONLCheckpointManager( "./checkpoints" , enabled = True )
ParquetCheckpointManager (recommended for large scale)✅ Pros :
50% smaller file sizes
Faster read/write for analytics
Columnar format for efficient queries
Industry standard
❌ Cons :
Requires pyarrow dependency
Binary format (not human-readable)
Use when : 1,000+ conversations or storage is a concernparquet_manager = ParquetCheckpointManager( "./checkpoints" , enabled = True )
Install :HFDatasetCheckpointManager (for cloud integration)✅ Pros :
Streaming support for huge datasets
Built-in versioning
Push to HuggingFace Hub
Memory-efficient
❌ Cons :
Requires datasets dependency
More complex setup
Slower local I/O than Parquet
Use when : Sharing results, team collaboration, or very large datasetshf_manager = HFDatasetCheckpointManager( "./checkpoints" , enabled = True )
Install :MultiCheckpointManager (best of all worlds)✅ Pros :
Save to multiple formats simultaneously
Redundancy (backup protection)
Load from fastest available format
Compare format performance
❌ Cons :
Uses more disk space
Slightly slower saves
Use when : Critical production workloads or evaluating formatsmulti_manager = MultiCheckpointManager(
[hf_manager, parquet_manager, jsonl_manager],
save_strategy = "all_enabled" ,
load_strategy = "first_found" ,
)
Storage Comparison
10,000 Conversation Dataset :
Checkpoint File JSONL Parquet Savings summaries 4.2 MB 2.1 MB 50% clusters 1.8 MB 0.9 MB 50% meta_clusters 0.6 MB 0.3 MB 50% dimensionality 0.4 MB 0.2 MB 50% Total 7.0 MB 3.5 MB 50%
Parquet format saves 50% disk space with faster read/write for large datasets!
Optimizing Concurrent Requests
The max_concurrent_requests parameter controls API parallelism:
# Conservative (default)
summary_model = SummaryModel( max_concurrent_requests = 50 )
# Balanced
summary_model = SummaryModel( max_concurrent_requests = 100 )
# Aggressive (if you have high rate limits)
summary_model = SummaryModel( max_concurrent_requests = 200 )
Benchmark (10k conversations):
Concurrency Time Throughput 25 45 min 3.7 conv/s 50 24 min 6.9 conv/s 100 14 min 11.9 conv/s 200 12 min 13.9 conv/s
Check your API provider’s rate limits before increasing concurrency. Too high can cause throttling.
Resuming Interrupted Runs
Checkpoints automatically save after each pipeline step:
# Start processing 10k conversations
python large_scale_analysis.py
# Summarization: 100% ████████████ (30 min)
# Clustering: 45% ████████------ (interrupted!)
# Resume - skips completed steps
python large_scale_analysis.py
# Loaded 10,000 summaries from checkpoint ← Instant!
# Clustering: 45% ████████------ (resumes)
Checkpoint locations :
large_scale_checkpoints/
├── hf/
│ ├── summaries/ # HF Dataset format
│ ├── clusters/
│ └── meta_clusters/
├── parquet/
│ ├── summaries.parquet # Parquet format (50% smaller)
│ ├── clusters.parquet
│ └── meta_clusters.parquet
└── jsonl/
├── summaries.jsonl # JSONL format (human-readable)
├── clusters.jsonl
└── meta_clusters.jsonl
MiniBatch K-means for Scale
For 10,000+ conversations, use MiniBatch K-means instead of standard K-means:
from kura.k_means import MiniBatchKmeansClusteringMethod
minibatch_kmeans = MiniBatchKmeansClusteringMethod(
clusters_per_group = 10 , # Target conversations per cluster
batch_size = 1000 , # Process 1000 embeddings per batch
max_iter = 100 , # Maximum iterations
random_state = 42 , # Reproducibility
)
cluster_model = ClusterDescriptionModel(
clustering_method = minibatch_kmeans
)
Benefits :
✅ 3-5x faster than standard K-means
✅ Constant memory usage (doesn’t load all embeddings)
✅ Scales to millions of conversations
✅ Similar quality to full K-means
Common Issues
Symptom : Script crashes with MemoryError during clustering.Solutions :
Reduce batch_size in MiniBatchKmeansClusteringMethod
Process data in smaller batches
Use HFDatasetCheckpointManager for memory-efficient loading
# Reduce memory usage
minibatch_kmeans = MiniBatchKmeansClusteringMethod(
batch_size = 500 , # Lower batch size
)
Symptom : Many 429 errors, slow summarization.Solutions :
Reduce max_concurrent_requests
Use a provider with higher rate limits
Process in smaller batches
# Reduce API concurrency
summary_model = SummaryModel(
max_concurrent_requests = 25 , # Lower concurrency
)
Symptom : Running out of disk space.Solutions :
Use ParquetCheckpointManager (50% smaller)
Disable JSONL checkpoints in MultiCheckpointManager
Clean old cache directories
# Use only Parquet (smallest format)
checkpoint_manager = ParquetCheckpointManager( "./checkpoints" )
Next Steps
Custom Metadata Extract sentiment, language, and custom properties
Comparing Models Compare different LLM and clustering configurations
Web Interface Explore results in an interactive dashboard
Streaming Data Process datasets too large for memory