from kura.checkpoints import ParquetCheckpointManagercheckpoint_mgr = ParquetCheckpointManager( checkpoint_dir="./checkpoints", compression="gzip" # or "snappy", "brotli", "zstd")# Same API as JSONLcheckpoint_mgr.save_checkpoint("summaries", summaries)loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)
# Fast compression (default)ParquetCheckpointManager(compression="snappy")# Balanced compressionParquetCheckpointManager(compression="gzip")# Maximum compressionParquetCheckpointManager(compression="zstd")# No compressionParquetCheckpointManager(compression=None)
from kura.checkpoints import HFDatasetCheckpointManagercheckpoint_mgr = HFDatasetCheckpointManager( checkpoint_dir="./checkpoints", hub_repo="my-username/kura-analysis", # Optional: push to Hub hub_token="hf_...", # Optional: for private repos streaming=False, # Set True for datasets larger than RAM compression="gzip")# Same APIcheckpoint_mgr.save_checkpoint("summaries", summaries)loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)
checkpoint_mgr = HFDatasetCheckpointManager( checkpoint_dir="./checkpoints", streaming=True)# Data is not loaded into memory all at onceloaded = checkpoint_mgr.load_checkpoint( "summaries", ConversationSummary, streaming=True)# Process in chunksfor batch in batched(loaded, 1000): process_batch(batch)
Use multiple backends simultaneously for redundancy:
from kura.checkpoints import ( MultiCheckpointManager, JSONLCheckpointManager, ParquetCheckpointManager)checkpoint_mgr = MultiCheckpointManager( managers=[ JSONLCheckpointManager("./checkpoints/jsonl"), ParquetCheckpointManager("./checkpoints/parquet") ])# Saves to BOTH backendscheckpoint_mgr.save_checkpoint("summaries", summaries)# Loads from the FIRST available backend (JSONL in this case)loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)
# List all checkpointscheckpoints = checkpoint_mgr.list_checkpoints()print(f"Available: {checkpoints}")# Output: ['summaries.jsonl', 'clusters.jsonl', 'meta_clusters.jsonl']# Check if specific checkpoint existsif "summaries" in [c.replace(".jsonl", "") for c in checkpoints]: print("Summaries checkpoint exists")
# Delete a single checkpointcheckpoint_mgr.delete_checkpoint("summaries")# Delete all checkpointsfor checkpoint in checkpoint_mgr.list_checkpoints(): checkpoint_mgr.delete_checkpoint(checkpoint)
# Load from JSONLjsonl_mgr = JSONLCheckpointManager("./checkpoints/jsonl")summaries = jsonl_mgr.load_checkpoint("summaries", ConversationSummary)# Save to Parquetparquet_mgr = ParquetCheckpointManager("./checkpoints/parquet")parquet_mgr.save_checkpoint("summaries", summaries)# Save to HuggingFace Datasetshf_mgr = HFDatasetCheckpointManager( checkpoint_dir="./checkpoints/hf", hub_repo="my-org/kura-analysis")hf_mgr.save_checkpoint("summaries", summaries)
checkpoint_mgr = HFDatasetCheckpointManager( checkpoint_dir="./checkpoints", hub_repo="my-org/shared-analysis", hub_token=os.environ["HF_TOKEN"])# Team members can load from the same Hub repoloaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)
import osfrom pathlib import Pathdef cleanup_old_checkpoints(checkpoint_dir: Path, keep_latest: int = 3): """Keep only the N most recent checkpoints.""" files = sorted( checkpoint_dir.glob("*.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True ) for file in files[keep_latest:]: file.unlink() print(f"Deleted old checkpoint: {file.name}")cleanup_old_checkpoints(Path("./checkpoints"), keep_latest=3)