Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The NBA Data Preprocessing Pipeline prioritizes reproducibility through seeded random number generation, dataset fingerprinting, and comprehensive run manifests. All random operations are deterministic when a random_seed is provided.

Global Seed Management

The pipeline uses a global seed to ensure deterministic behavior across all random operations.

Setting the Global Seed

The set_global_seed function from pipeline/reproducibility.py initializes all random number generators:
import os
import random
import numpy as np

def set_global_seed(seed: int) -> None:
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
This function is automatically called during RealTimePipelineRunner initialization.

Seeded Components

The following components use the configured random_seed:
All major components receive the seed during initialization:
class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig):
        set_global_seed(config.random_seed)
        self.ingestor = DataIngestor(config.random_seed)
        self.preprocessor = Preprocessor(config.random_seed)
        self.engineer = FeatureEngineer()
        self.validator = DataValidator()

Dataset Fingerprinting

Every pipeline run computes a SHA-256 fingerprint of the input dataset to track data versions.

Fingerprint Generation

The DataIngestor computes fingerprints during ingestion:
df = self.ingestor.load(source)
fp = self.ingestor.fingerprint(df)
The fingerprint includes:
  • Dataset hash (SHA-256)
  • Row count
  • Column names
  • Data shape

Fingerprint in Reports

The fingerprint is included in all pipeline reports:
report = {
    'dataset_fingerprint': asdict(fp),
    'reproducibility': self._reproducibility_manifest(),
    # ... other results
}
Dataset fingerprinting is file-based, not registry-backed. For production use, consider implementing a centralized dataset registry.

Reproducibility Manifest

Each pipeline run generates a comprehensive reproducibility manifest.

Manifest Contents

The manifest captures all information needed to reproduce a run:
def _reproducibility_manifest(self) -> dict[str, Any]:
    return {
        'random_seed': self.config.random_seed,
        'python_version': sys.version.split()[0],
        'platform': platform.platform(),
        'config': {
            'chunk_size': self.config.chunk_size,
            'batch_size': self.config.batch_size,
            'max_memory_mb': self.config.max_memory_mb,
            'max_compute_units': self.config.max_compute_units,
            'benchmark_runs': self.config.benchmark_runs,
            'n_jobs': self.config.n_jobs,
            'adaptive_chunk_resize': self.config.adaptive_chunk_resize,
            'max_chunk_retries': self.config.max_chunk_retries,
            'spill_to_disk': self.config.spill_to_disk,
        },
        'dependencies': {
            'numpy': np.__version__,
            'pandas': pd.__version__,
            'matplotlib': matplotlib.__version__,
        },
    }

Manifest Storage

The manifest is written to metadata/run_manifest.json:
def _write_artifacts(self, report: dict) -> None:
    with (out / 'metadata' / 'run_manifest.json').open('w', encoding='utf-8') as f:
        json.dump(report['reproducibility'], f, indent=2)

Configuration Management

The PipelineConfig dataclass is frozen to prevent accidental modification:
@dataclass(frozen=True)
class PipelineConfig:
    """Centralized configuration for deterministic pipeline runs."""
    
    random_seed: int = 42
    chunk_size: int = 128
    batch_size: int = 256
    # ... other parameters
Using frozen=True ensures the configuration is immutable after initialization, preventing drift during execution.

Creating Configurations

from pipeline.config import PipelineConfig

# Default configuration
config = PipelineConfig()

# Custom configuration
config = PipelineConfig(
    random_seed=123,
    chunk_size=256,
    max_memory_mb=2048,
)

Bootstrap Confidence Intervals

The pipeline uses bootstrap resampling to compute confidence intervals for benchmark metrics.

Implementation

def _bootstrap_ci(self, arr: np.ndarray, n_bootstrap: int = 400) -> dict[str, float]:
    if len(arr) == 0:
        return {
            'sample_size': 0,
            'mean': 0.0,
            'std': 0.0,
            'median': 0.0,
            'p95': 0.0,
            'ci95_low': 0.0,
            'ci95_high': 0.0,
        }
    
    rng = np.random.default_rng(self.config.random_seed)
    means = []
    for _ in range(n_bootstrap):
        sample = rng.choice(arr, size=len(arr), replace=True)
        means.append(float(sample.mean()))
    
    return {
        'sample_size': int(len(arr)),
        'mean': float(arr.mean()),
        'std': float(arr.std(ddof=0)),
        'median': float(np.median(arr)),
        'p95': float(np.percentile(arr, 95)),
        'ci95_low': float(np.percentile(means, 2.5)),
        'ci95_high': float(np.percentile(means, 97.5)),
    }

Usage in Benchmarks

def benchmark(self, source: str | Path | pd.DataFrame) -> dict:
    runs = []
    for _ in range(self.config.benchmark_runs):
        runs.append({
            'batch': self.run_batch(source),
            'streaming': self.run_streaming(source)
        })

    batch_latencies = np.array([r['batch']['latency_s'] for r in runs], dtype=float)
    stream_latencies = np.array([r['streaming']['latency_s'] for r in runs], dtype=float)
    
    return {
        'latency_batch': self._bootstrap_ci(batch_latencies),
        'latency_streaming': self._bootstrap_ci(stream_latencies),
        # ...
    }

Permutation Tests

The pipeline uses permutation testing to assess statistical significance of performance differences.

Implementation

def _permutation_pvalue(self, a: np.ndarray, b: np.ndarray, n_perm: int = 1000) -> float:
    if len(a) == 0 or len(b) == 0:
        return 1.0
    
    rng = np.random.default_rng(self.config.random_seed)
    observed = abs(float(a.mean() - b.mean()))
    combined = np.concatenate([a, b])
    
    count = 0
    for _ in range(n_perm):
        shuffled = rng.permutation(combined)
        a_perm = shuffled[:len(a)]
        b_perm = shuffled[len(a):]
        if abs(float(a_perm.mean() - b_perm.mean())) >= observed:
            count += 1
    
    return float((count + 1) / (n_perm + 1))

Significance Testing

Permutation tests are used to compare batch vs streaming modes:
return {
    'significance': {
        'latency_pvalue': self._permutation_pvalue(batch_latencies, stream_latencies),
        'throughput_pvalue': self._permutation_pvalue(batch_tp, stream_tp),
        'latency_mean_delta_s': float(stream_latencies.mean() - batch_latencies.mean()),
        'throughput_mean_delta_rows_s': float(stream_tp.mean() - batch_tp.mean()),
    },
}
Results are saved to benchmarks/significance_tests.csv.

Reproducible Benchmarks

The benchmark_runs parameter controls the number of repeated runs for statistical stability.

Running Benchmarks

config = PipelineConfig(
    random_seed=42,
    benchmark_runs=10,  # Run each mode 10 times
)

runner = RealTimePipelineRunner(config)
benchmark_results = runner.benchmark('data/nba2k-full.csv')

Benchmark Artifacts

Each benchmark run produces:
  • benchmarks/latency_vs_data_size.csv
  • benchmarks/throughput_vs_memory.csv
  • benchmarks/resource_vs_accuracy.csv
  • benchmarks/significance_tests.csv
  • benchmarks/latency_vs_accuracy.png
  • benchmarks/memory_vs_accuracy.png
  • benchmarks/latency_memory_accuracy.png
Measures preprocessing latency across different dataset sizes:
sizes = [min(size_base, s) for s in (64, 128, 256, size_base)]
latency_vs_size = []
for size in sizes:
    sample = source.iloc[:size] if isinstance(source, pd.DataFrame) \
             else self.ingestor.load(source).iloc[:size]
    b = self.run_batch(sample)
    latency_vs_size.append({'rows': size, 'latency_s': b['latency_s']})

Validating Reproducibility

To verify reproducibility, run the same configuration multiple times and compare artifacts.

Validation Command

cd "NBA Data Preprocessing/task"
python -m unittest discover -s test -p 'test_*.py'

Manual Validation

1

Run Pipeline Twice

config = PipelineConfig(random_seed=42, output_dir=Path('run1'))
runner1 = RealTimePipelineRunner(config)
report1 = runner1.run_all('data/nba2k-full.csv')

config = PipelineConfig(random_seed=42, output_dir=Path('run2'))
runner2 = RealTimePipelineRunner(config)
report2 = runner2.run_all('data/nba2k-full.csv')
2

Compare Dataset Fingerprints

assert report1['dataset_fingerprint'] == report2['dataset_fingerprint']
3

Compare Model Metrics

assert abs(report1['batch']['model']['r2'] - 
           report2['batch']['model']['r2']) < 1e-6
4

Compare Chunk Metrics

chunks1 = report1['streaming']['chunk_metrics']
chunks2 = report2['streaming']['chunk_metrics']

assert len(chunks1) == len(chunks2)
for c1, c2 in zip(chunks1, chunks2):
    assert c1['rows'] == c2['rows']

Best Practices

Always Set random_seed

Explicitly set random_seed in all configurations, even for development.

Use Distinct output_dir

Store each run’s artifacts in a separate directory to prevent overwriting.

Keep benchmark_runs Fixed

Use the same benchmark_runs value when comparing different configurations.

Document Dependencies

The manifest captures dependency versions, but maintain a requirements.txt for installation.

Example Reproducible Run

from pathlib import Path
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

# Fixed seed and configuration
config = PipelineConfig(
    random_seed=42,
    chunk_size=128,
    benchmark_runs=5,
    output_dir=Path('artifacts/run_20260304_143000'),
)

# Run pipeline
runner = RealTimePipelineRunner(config)
report = runner.run_all('data/nba2k-full.csv')

# Artifacts written to:
# - artifacts/run_20260304_143000/metadata/run_manifest.json
# - artifacts/run_20260304_143000/reports/pipeline_report.json
# - artifacts/run_20260304_143000/benchmarks/*.csv
Include a timestamp or run ID in output_dir to automatically organize multiple experimental runs.

Limitations

Current fingerprinting is based on file hashes, not a centralized registry. Consider implementing a dataset registry for production use.
Profiling is stage-level and chunk-level. Fine-grained kernel-level tracing is not included.
RAPL energy counters may be unavailable in containers or on non-Intel platforms. Fallback estimates are used but less accurate.
Parallel execution (n_jobs > 1) introduces timing variance. For strict reproducibility, use n_jobs=1.

Next Steps

Architecture

Review the overall system architecture

Execution Modes

Learn about batch vs streaming modes

Build docs developers (and LLMs) love