Documentation Index Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The NBA Data Preprocessing Pipeline prioritizes reproducibility through seeded random number generation, dataset fingerprinting, and comprehensive run manifests. All random operations are deterministic when a random_seed is provided.
Global Seed Management
The pipeline uses a global seed to ensure deterministic behavior across all random operations.
Setting the Global Seed
The set_global_seed function from pipeline/reproducibility.py initializes all random number generators:
import os
import random
import numpy as np
def set_global_seed ( seed : int ) -> None :
os.environ[ 'PYTHONHASHSEED' ] = str (seed)
random.seed(seed)
np.random.seed(seed)
This function is automatically called during RealTimePipelineRunner initialization.
Seeded Components
The following components use the configured random_seed:
Pipeline Components
Models
Validation
Statistical Tests
All major components receive the seed during initialization: class RealTimePipelineRunner :
def __init__ ( self , config : PipelineConfig):
set_global_seed(config.random_seed)
self .ingestor = DataIngestor(config.random_seed)
self .preprocessor = Preprocessor(config.random_seed)
self .engineer = FeatureEngineer()
self .validator = DataValidator()
Both batch and streaming models are seeded: Batch mode (LinearRegression):model = LinearRegression() # Deterministic
Streaming mode (SGDRegressor):online_model = SGDRegressor(
random_state = self .config.random_seed,
max_iter = 1 ,
tol = None ,
learning_rate = 'invscaling'
)
Data validation operations like sampling and drift detection: drift_score = self .validator.drift_detection(
cleaned,
cleaned.sample( frac = 1.0 , random_state = self .config.random_seed)
)
Bootstrap and permutation tests use seeded RNG: def _bootstrap_ci ( self , arr : np.ndarray, n_bootstrap : int = 400 ):
rng = np.random.default_rng( self .config.random_seed)
means = []
for _ in range (n_bootstrap):
sample = rng.choice(arr, size = len (arr), replace = True )
means.append( float (sample.mean()))
Dataset Fingerprinting
Every pipeline run computes a SHA-256 fingerprint of the input dataset to track data versions.
Fingerprint Generation
The DataIngestor computes fingerprints during ingestion:
df = self .ingestor.load(source)
fp = self .ingestor.fingerprint(df)
The fingerprint includes:
Dataset hash (SHA-256)
Row count
Column names
Data shape
Fingerprint in Reports
The fingerprint is included in all pipeline reports:
report = {
'dataset_fingerprint' : asdict(fp),
'reproducibility' : self ._reproducibility_manifest(),
# ... other results
}
Dataset fingerprinting is file-based, not registry-backed. For production use, consider implementing a centralized dataset registry.
Reproducibility Manifest
Each pipeline run generates a comprehensive reproducibility manifest.
Manifest Contents
The manifest captures all information needed to reproduce a run:
def _reproducibility_manifest ( self ) -> dict[ str , Any]:
return {
'random_seed' : self .config.random_seed,
'python_version' : sys.version.split()[ 0 ],
'platform' : platform.platform(),
'config' : {
'chunk_size' : self .config.chunk_size,
'batch_size' : self .config.batch_size,
'max_memory_mb' : self .config.max_memory_mb,
'max_compute_units' : self .config.max_compute_units,
'benchmark_runs' : self .config.benchmark_runs,
'n_jobs' : self .config.n_jobs,
'adaptive_chunk_resize' : self .config.adaptive_chunk_resize,
'max_chunk_retries' : self .config.max_chunk_retries,
'spill_to_disk' : self .config.spill_to_disk,
},
'dependencies' : {
'numpy' : np. __version__ ,
'pandas' : pd. __version__ ,
'matplotlib' : matplotlib. __version__ ,
},
}
Manifest Storage
The manifest is written to metadata/run_manifest.json:
def _write_artifacts ( self , report : dict ) -> None :
with (out / 'metadata' / 'run_manifest.json' ).open( 'w' , encoding = 'utf-8' ) as f:
json.dump(report[ 'reproducibility' ], f, indent = 2 )
Configuration Management
The PipelineConfig dataclass is frozen to prevent accidental modification:
@dataclass ( frozen = True )
class PipelineConfig :
"""Centralized configuration for deterministic pipeline runs."""
random_seed: int = 42
chunk_size: int = 128
batch_size: int = 256
# ... other parameters
Using frozen=True ensures the configuration is immutable after initialization, preventing drift during execution.
Creating Configurations
from pipeline.config import PipelineConfig
# Default configuration
config = PipelineConfig()
# Custom configuration
config = PipelineConfig(
random_seed = 123 ,
chunk_size = 256 ,
max_memory_mb = 2048 ,
)
Bootstrap Confidence Intervals
The pipeline uses bootstrap resampling to compute confidence intervals for benchmark metrics.
Implementation
def _bootstrap_ci ( self , arr : np.ndarray, n_bootstrap : int = 400 ) -> dict[ str , float ]:
if len (arr) == 0 :
return {
'sample_size' : 0 ,
'mean' : 0.0 ,
'std' : 0.0 ,
'median' : 0.0 ,
'p95' : 0.0 ,
'ci95_low' : 0.0 ,
'ci95_high' : 0.0 ,
}
rng = np.random.default_rng( self .config.random_seed)
means = []
for _ in range (n_bootstrap):
sample = rng.choice(arr, size = len (arr), replace = True )
means.append( float (sample.mean()))
return {
'sample_size' : int ( len (arr)),
'mean' : float (arr.mean()),
'std' : float (arr.std( ddof = 0 )),
'median' : float (np.median(arr)),
'p95' : float (np.percentile(arr, 95 )),
'ci95_low' : float (np.percentile(means, 2.5 )),
'ci95_high' : float (np.percentile(means, 97.5 )),
}
Usage in Benchmarks
def benchmark ( self , source : str | Path | pd.DataFrame) -> dict :
runs = []
for _ in range ( self .config.benchmark_runs):
runs.append({
'batch' : self .run_batch(source),
'streaming' : self .run_streaming(source)
})
batch_latencies = np.array([r[ 'batch' ][ 'latency_s' ] for r in runs], dtype = float )
stream_latencies = np.array([r[ 'streaming' ][ 'latency_s' ] for r in runs], dtype = float )
return {
'latency_batch' : self ._bootstrap_ci(batch_latencies),
'latency_streaming' : self ._bootstrap_ci(stream_latencies),
# ...
}
Permutation Tests
The pipeline uses permutation testing to assess statistical significance of performance differences.
Implementation
def _permutation_pvalue ( self , a : np.ndarray, b : np.ndarray, n_perm : int = 1000 ) -> float :
if len (a) == 0 or len (b) == 0 :
return 1.0
rng = np.random.default_rng( self .config.random_seed)
observed = abs ( float (a.mean() - b.mean()))
combined = np.concatenate([a, b])
count = 0
for _ in range (n_perm):
shuffled = rng.permutation(combined)
a_perm = shuffled[: len (a)]
b_perm = shuffled[ len (a):]
if abs ( float (a_perm.mean() - b_perm.mean())) >= observed:
count += 1
return float ((count + 1 ) / (n_perm + 1 ))
Significance Testing
Permutation tests are used to compare batch vs streaming modes:
return {
'significance' : {
'latency_pvalue' : self ._permutation_pvalue(batch_latencies, stream_latencies),
'throughput_pvalue' : self ._permutation_pvalue(batch_tp, stream_tp),
'latency_mean_delta_s' : float (stream_latencies.mean() - batch_latencies.mean()),
'throughput_mean_delta_rows_s' : float (stream_tp.mean() - batch_tp.mean()),
},
}
Results are saved to benchmarks/significance_tests.csv.
Reproducible Benchmarks
The benchmark_runs parameter controls the number of repeated runs for statistical stability.
Running Benchmarks
config = PipelineConfig(
random_seed = 42 ,
benchmark_runs = 10 , # Run each mode 10 times
)
runner = RealTimePipelineRunner(config)
benchmark_results = runner.benchmark( 'data/nba2k-full.csv' )
Benchmark Artifacts
Each benchmark run produces:
benchmarks/latency_vs_data_size.csv
benchmarks/throughput_vs_memory.csv
benchmarks/resource_vs_accuracy.csv
benchmarks/significance_tests.csv
benchmarks/latency_vs_accuracy.png
benchmarks/memory_vs_accuracy.png
benchmarks/latency_memory_accuracy.png
Latency vs Data Size
Throughput vs Memory
Resource vs Accuracy
Measures preprocessing latency across different dataset sizes: sizes = [ min (size_base, s) for s in ( 64 , 128 , 256 , size_base)]
latency_vs_size = []
for size in sizes:
sample = source.iloc[:size] if isinstance (source, pd.DataFrame) \
else self .ingestor.load(source).iloc[:size]
b = self .run_batch(sample)
latency_vs_size.append({ 'rows' : size, 'latency_s' : b[ 'latency_s' ]})
Correlates memory usage with throughput: throughput_vs_memory.append({
'peak_memory_mb' : b[ 'peak_memory_mb' ],
'throughput_rows_s' : b[ 'throughput_rows_s' ]
})
Tracks the relationship between resource usage and model quality: 'resource_vs_accuracy' : [
{
'mode' : r[ 'batch' ][ 'mode' ],
'peak_memory_mb' : r[ 'batch' ][ 'peak_memory_mb' ],
'r2' : r[ 'batch' ][ 'model' ][ 'r2' ],
}
for r in runs
]
Validating Reproducibility
To verify reproducibility, run the same configuration multiple times and compare artifacts.
Validation Command
cd "NBA Data Preprocessing/task"
python -m unittest discover -s test -p 'test_*.py'
Manual Validation
Run Pipeline Twice
config = PipelineConfig( random_seed = 42 , output_dir = Path( 'run1' ))
runner1 = RealTimePipelineRunner(config)
report1 = runner1.run_all( 'data/nba2k-full.csv' )
config = PipelineConfig( random_seed = 42 , output_dir = Path( 'run2' ))
runner2 = RealTimePipelineRunner(config)
report2 = runner2.run_all( 'data/nba2k-full.csv' )
Compare Dataset Fingerprints
assert report1[ 'dataset_fingerprint' ] == report2[ 'dataset_fingerprint' ]
Compare Model Metrics
assert abs (report1[ 'batch' ][ 'model' ][ 'r2' ] -
report2[ 'batch' ][ 'model' ][ 'r2' ]) < 1e-6
Compare Chunk Metrics
chunks1 = report1[ 'streaming' ][ 'chunk_metrics' ]
chunks2 = report2[ 'streaming' ][ 'chunk_metrics' ]
assert len (chunks1) == len (chunks2)
for c1, c2 in zip (chunks1, chunks2):
assert c1[ 'rows' ] == c2[ 'rows' ]
Best Practices
Always Set random_seed Explicitly set random_seed in all configurations, even for development.
Use Distinct output_dir Store each run’s artifacts in a separate directory to prevent overwriting.
Keep benchmark_runs Fixed Use the same benchmark_runs value when comparing different configurations.
Document Dependencies The manifest captures dependency versions, but maintain a requirements.txt for installation.
Example Reproducible Run
from pathlib import Path
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner
# Fixed seed and configuration
config = PipelineConfig(
random_seed = 42 ,
chunk_size = 128 ,
benchmark_runs = 5 ,
output_dir = Path( 'artifacts/run_20260304_143000' ),
)
# Run pipeline
runner = RealTimePipelineRunner(config)
report = runner.run_all( 'data/nba2k-full.csv' )
# Artifacts written to:
# - artifacts/run_20260304_143000/metadata/run_manifest.json
# - artifacts/run_20260304_143000/reports/pipeline_report.json
# - artifacts/run_20260304_143000/benchmarks/*.csv
Include a timestamp or run ID in output_dir to automatically organize multiple experimental runs.
Limitations
Dataset Versioning is File-Based
Current fingerprinting is based on file hashes, not a centralized registry. Consider implementing a dataset registry for production use.
Profiling is stage-level and chunk-level. Fine-grained kernel-level tracing is not included.
Energy Telemetry May Be Coarse
RAPL energy counters may be unavailable in containers or on non-Intel platforms. Fallback estimates are used but less accurate.
Timing Variance with n_jobs > 1
Parallel execution (n_jobs > 1) introduces timing variance. For strict reproducibility, use n_jobs=1.
Next Steps
Architecture Review the overall system architecture
Execution Modes Learn about batch vs streaming modes