Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The RealTimePipelineRunner class orchestrates the entire preprocessing pipeline, supporting both batch and streaming execution modes with comprehensive benchmarking and telemetry.

Class Definition

class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:32

Constructor

config
PipelineConfig
required
Pipeline configuration controlling all execution parameters
Initializes all pipeline components:
  • DataIngestor for data loading
  • Preprocessor for cleaning
  • FeatureEngineer for feature creation
  • DataValidator for quality checks
  • HardwareMonitor for telemetry

Methods

run_batch

def run_batch(self, source: str | Path | pd.DataFrame) -> dict
Executes the full pipeline in batch mode (loads entire dataset into memory).
source
str | Path | pd.DataFrame
required
Data source: file path (CSV) or pandas DataFrame
return
dict
Comprehensive report containing:
  • mode: “batch”
  • rows: Number of rows processed
  • latency_s: Total execution time in seconds
  • throughput_rows_s: Processing throughput
  • peak_memory_mb: Peak memory usage
  • energy_estimate_j: Energy consumption estimate
  • telemetry: Hardware metrics
  • model: Model performance metrics (RMSE, R², training time)
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(random_seed=42)
runner = RealTimePipelineRunner(config)

result = runner.run_batch('nba_data.csv')
print(f"Processed {result['rows']} rows in {result['latency_s']:.2f}s")
print(f"Model R²: {result['model']['r2']:.4f}")
print(f"Peak memory: {result['peak_memory_mb']:.2f} MB")

run_streaming

def run_streaming(
    self,
    source: str | Path | pd.DataFrame,
    chunk_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> dict
Executes the pipeline in streaming mode with adaptive chunk processing.
source
str | Path | pd.DataFrame
required
Data source: file path (CSV) or pandas DataFrame
chunk_size
int | None
default:"None"
Override config chunk size for this run
max_memory_mb
int | None
default:"None"
Override config memory limit for this run
max_compute_units
float | None
default:"None"
Override config compute limit for this run
return
dict
Streaming execution report containing:
  • mode: “streaming”
  • rows: Total rows processed
  • latency_s: Total execution time
  • throughput_rows_s: Processing throughput
  • peak_memory_mb: Peak memory usage
  • energy_estimate_j: Energy consumption
  • telemetry: Hardware metrics
  • chunk_metrics: Per-chunk performance data
  • operator_profile_summary_s: Timing breakdown by operation
  • model: Online model performance
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    chunk_size=128,
    max_memory_mb=1024,
    adaptive_chunk_resize=True
)
runner = RealTimePipelineRunner(config)

result = runner.run_streaming(
    'large_nba_data.csv',
    chunk_size=256,  # Override config
    max_memory_mb=2048
)

print(f"Processed {len(result['chunk_metrics'])} chunks")
print(f"Average latency per chunk: {result['latency_s'] / len(result['chunk_metrics']):.3f}s")
print(f"Operator profile: {result['operator_profile_summary_s']}")

benchmark

def benchmark(self, source: str | Path | pd.DataFrame) -> dict
Runs multiple iterations of both batch and streaming modes for statistical analysis.
source
str | Path | pd.DataFrame
required
Data source for benchmarking
return
dict
Benchmark results containing:
  • runs: List of all individual run results
  • latency_batch: Bootstrap confidence intervals for batch latency
  • latency_streaming: Bootstrap confidence intervals for streaming latency
  • throughput_batch: Bootstrap confidence intervals for batch throughput
  • throughput_streaming: Bootstrap confidence intervals for streaming throughput
  • significance: Permutation test p-values comparing modes
  • latency_vs_data_size: Scaling analysis
  • throughput_vs_memory: Resource efficiency analysis
  • resource_vs_accuracy: Accuracy-resource tradeoffs
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    random_seed=42,
    benchmark_runs=10
)
runner = RealTimePipelineRunner(config)

benchmark_results = runner.benchmark('nba_data.csv')

print("Batch latency:")
print(f"  Mean: {benchmark_results['latency_batch']['mean']:.3f}s")
print(f"  95% CI: [{benchmark_results['latency_batch']['ci95_low']:.3f}, "
      f"{benchmark_results['latency_batch']['ci95_high']:.3f}]")

print("\nStreaming latency:")
print(f"  Mean: {benchmark_results['latency_streaming']['mean']:.3f}s")
print(f"  95% CI: [{benchmark_results['latency_streaming']['ci95_low']:.3f}, "
      f"{benchmark_results['latency_streaming']['ci95_high']:.3f}]")

print(f"\nStatistical significance (p-value): "
      f"{benchmark_results['significance']['latency_pvalue']:.4f}")

run_constraint_experiment

def run_constraint_experiment(self, source: str | Path | pd.DataFrame) -> dict
Tests pipeline performance across different resource constraints (chunk sizes, memory limits, compute limits).
source
str | Path | pd.DataFrame
required
Data source for experiments
return
dict
Experiment results containing:
  • records: List of results for each constraint combination
  • summary: Aggregate statistics including best accuracy, lowest latency, etc.
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    n_jobs=4,  # Run experiments in parallel
    chunk_size=128,
    max_memory_mb=1024,
    max_compute_units=1.0
)
runner = RealTimePipelineRunner(config)

experiments = runner.run_constraint_experiment('nba_data.csv')

print(f"Tested {len(experiments['records'])} configurations")
print(f"Best accuracy: {experiments['summary']['best_accuracy_r2']:.4f}")
print(f"Lowest latency: {experiments['summary']['lowest_latency_s']:.3f}s")
print(f"Max memory used: {experiments['summary']['max_peak_memory_mb']:.2f} MB")

# Find optimal configuration
for record in experiments['records']:
    if record['model_accuracy_r2'] == experiments['summary']['best_accuracy_r2']:
        print(f"\nOptimal config:")
        print(f"  Chunk size: {record['chunk_size']}")
        print(f"  Memory limit: {record['memory_limit_mb']} MB")
        print(f"  Compute limit: {record['compute_limit']}")

run_all

def run_all(self, source: str | Path | pd.DataFrame) -> dict
Executes the complete pipeline including batch, streaming, benchmarking, experiments, and validation.
source
str | Path | pd.DataFrame
required
Data source for comprehensive pipeline execution
return
dict
Comprehensive report containing:
  • dataset_fingerprint: Dataset SHA256 hash and metadata
  • reproducibility: Environment and configuration manifest
  • batch: Batch execution results
  • streaming: Streaming execution results
  • benchmark: Statistical benchmark results
  • constraint_experiment: Resource constraint experiments
  • quality: Data quality and validation reports
  • scaling: Parallel processing statistics
Also writes all artifacts to disk:
  • reports/pipeline_report.json
  • metadata/run_manifest.json
  • benchmarks/*.csv (multiple files)
  • profiles/operator_profile.csv
  • reports/streaming_chunks.jsonl
  • benchmarks/*.png (visualization plots)
Example:
from pathlib import Path
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner
import json

config = PipelineConfig(
    random_seed=42,
    benchmark_runs=5,
    n_jobs=4,
    output_dir=Path('full_pipeline_run')
)

runner = RealTimePipelineRunner(config)
report = runner.run_all('nba_salaries.csv')

print(f"Dataset: {report['dataset_fingerprint']['rows']} rows, "
      f"{report['dataset_fingerprint']['columns']} columns")
print(f"SHA256: {report['dataset_fingerprint']['sha256']}")

print(f"\nBatch performance:")
print(f"  Latency: {report['batch']['latency_s']:.3f}s")
print(f"  Model R²: {report['batch']['model']['r2']:.4f}")

print(f"\nStreaming performance:")
print(f"  Latency: {report['streaming']['latency_s']:.3f}s")
print(f"  Chunks: {len(report['streaming']['chunk_metrics'])}")

print(f"\nData quality:")
print(f"  Outlier rate: {report['quality']['outlier_rate']:.2%}")
print(f"  Drift score: {report['quality']['drift_score']:.4f}")
print(f"  Schema valid: {report['quality']['schema_ok']}")

# Artifacts are saved to full_pipeline_run/
with open('full_pipeline_run/reports/pipeline_report.json') as f:
    saved_report = json.load(f)
    print(f"\nReport saved with {len(saved_report)} sections")

Private Methods

_hardware_adjusted_sizes

def _hardware_adjusted_sizes(
    self,
    rows: int,
    chunk_size: int | None = None,
    batch_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> tuple[int, int]
Calculates adjusted batch and chunk sizes based on hardware constraints. Returns: (adjusted_batch_size, adjusted_chunk_size)

_bootstrap_ci

def _bootstrap_ci(self, arr: np.ndarray, n_bootstrap: int = 400) -> dict[str, float]
Computes bootstrap confidence intervals for performance metrics. Returns dict with: mean, std, median, p95, ci95_low, ci95_high, sample_size

_permutation_pvalue

def _permutation_pvalue(self, a: np.ndarray, b: np.ndarray, n_perm: int = 1000) -> float
Performs permutation test to determine statistical significance between two distributions.

Backward Compatibility

PipelineRunner = RealTimePipelineRunner
The alias PipelineRunner is provided for backward compatibility with older code.

Notes

  • All methods respect the random_seed from config for reproducibility
  • Streaming mode uses online learning (SGDRegressor) for incremental model updates
  • Batch mode uses LinearRegression for full dataset training
  • Hardware telemetry includes RAPL energy measurements when available
  • Adaptive chunk resizing automatically handles memory pressure
  • All timing uses time.perf_counter() for high-resolution measurements

Build docs developers (and LLMs) love