Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The constraint experiment functionality systematically tests pipeline performance across combinations of chunk sizes, memory limits, and compute constraints. This enables:
  • Finding optimal configurations for constrained environments
  • Understanding performance trade-offs
  • Validating edge and low-resource scenarios
  • Identifying Pareto-optimal points

Running Constraint Experiments

Programmatic Usage

from pipeline.streaming.engine import RealTimePipelineRunner
from pipeline.config import PipelineConfig

config = PipelineConfig(
    chunk_size=128,
    max_memory_mb=512,
    max_compute_units=1.0,
    n_jobs=1  # Disable parallelism for isolated tests
)

runner = RealTimePipelineRunner(config)
results = runner.run_constraint_experiment("nba2k-full.csv")

Command-Line Usage

The constraint experiment runs automatically as part of run_all():
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir artifacts \
  --chunk-size 128 \
  --max-memory-mb 512 \
  --max-compute-units 1.0

Experiment Methodology

Implementation

From engine.py:389-413:
def run_constraint_experiment(self, source: str | Path | pd.DataFrame) -> dict:
    rows = len(source) if isinstance(source, pd.DataFrame) else len(self.ingestor.load(source))

    # Define parameter grid
    chunk_sizes = sorted(set([max(16, min(rows, s)) for s in [64, self.config.chunk_size]]))
    memory_limits = sorted(set([256, self.config.max_memory_mb]))
    compute_limits = sorted(set([0.5, self.config.max_compute_units]))
    tasks = [(c, m, cp) for c in chunk_sizes for m in memory_limits for cp in compute_limits]

    # Run experiments (parallel or sequential)
    if self.config.n_jobs > 1:
        experiment_rows = Parallel(n_jobs=self.config.n_jobs)(
            delayed(self._single_constraint_run)(source, c, m, cp) for c, m, cp in tasks
        )
    else:
        experiment_rows = [self._single_constraint_run(source, c, m, cp) for c, m, cp in tasks]

    results_df = pd.DataFrame(experiment_rows).sort_values(['chunk_size', 'memory_limit_mb', 'compute_limit'])
    return {
        'records': results_df.to_dict(orient='records'),
        'summary': {
            'best_accuracy_r2': float(results_df['model_accuracy_r2'].max()),
            'lowest_latency_s': float(results_df['preprocessing_latency_s'].min()),
            'lowest_training_time_s': float(results_df['training_time_s'].min()),
            'max_peak_memory_mb': float(results_df['peak_memory_mb'].max()),
        },
    }

Parameter Grid

The experiment tests all combinations of:
  1. Chunk sizes: [64, config.chunk_size]
    • Minimum: 64 rows
    • Maximum: Configured chunk size
    • Duplicates removed
  2. Memory limits: [256, config.max_memory_mb]
    • Low-memory scenario: 256 MB
    • Configured limit: User-specified
  3. Compute limits: [0.5, config.max_compute_units]
    • CPU-constrained: 50% utilization
    • Full utilization: User-specified
Total runs: Up to 2 × 2 × 2 = 8 configurations

Single Run Implementation

From engine.py:376-387:
def _single_constraint_run(self, source: str | Path | pd.DataFrame, chunk: int, memory: int, compute: float) -> dict:
    run = self.run_streaming(source, chunk_size=chunk, max_memory_mb=memory, max_compute_units=compute)
    return {
        'chunk_size': int(chunk),
        'memory_limit_mb': int(memory),
        'compute_limit': float(compute),
        'preprocessing_latency_s': float(run['latency_s']),
        'peak_memory_mb': float(run['peak_memory_mb']),
        'training_time_s': float(run['model']['training_time_s']),
        'model_accuracy_r2': float(run['model']['r2']),
        'model_rmse': float(run['model']['rmse']),
    }

Generated Artifacts

constraint_experiment.csv

Complete results matrix in output_dir/benchmarks/:
chunk_size,memory_limit_mb,compute_limit,preprocessing_latency_s,peak_memory_mb,training_time_s,model_accuracy_r2,model_rmse
64,256,0.5,2.145,248.3,2.134,0.821,1245.67
64,256,1.0,1.987,251.7,1.976,0.824,1238.92
64,512,0.5,1.856,342.1,1.845,0.829,1229.45
64,512,1.0,1.678,347.8,1.667,0.831,1223.18
128,256,0.5,1.923,254.9,1.912,0.835,1215.34
128,256,1.0,1.734,258.2,1.723,0.837,1209.76
128,512,0.5,1.567,389.4,1.556,0.842,1198.23
128,512,1.0,1.421,394.6,1.410,0.845,1192.45
Column Descriptions:
  • chunk_size: Streaming chunk size (rows)
  • memory_limit_mb: Maximum memory constraint
  • compute_limit: CPU constraint factor (0.0-1.0)
  • preprocessing_latency_s: Total preprocessing time
  • peak_memory_mb: Maximum memory usage observed
  • training_time_s: Model training time (includes preprocessing)
  • model_accuracy_r2: Regression R² score
  • model_rmse: Root mean squared error

constraint_experiment_log.jsonl

JSON Lines format for programmatic analysis in output_dir/reports/:
{"chunk_size": 64, "memory_limit_mb": 256, "compute_limit": 0.5, "preprocessing_latency_s": 2.145, "peak_memory_mb": 248.3, "training_time_s": 2.134, "model_accuracy_r2": 0.821, "model_rmse": 1245.67}
{"chunk_size": 64, "memory_limit_mb": 256, "compute_limit": 1.0, "preprocessing_latency_s": 1.987, "peak_memory_mb": 251.7, "training_time_s": 1.976, "model_accuracy_r2": 0.824, "model_rmse": 1238.92}

Visualization Plots

Generated in output_dir/benchmarks/ (see engine.py:509-555):

latency_vs_accuracy.png

Scatter plot showing trade-off between preprocessing speed and model quality:
  • X-axis: Preprocessing latency (seconds)
  • Y-axis: Model accuracy (R²)
  • Color: Compute constraint level
plt.scatter(
    experiment_df['preprocessing_latency_s'],
    experiment_df['model_accuracy_r2'],
    c=experiment_df['compute_limit'],
    cmap='viridis',
)

memory_vs_accuracy.png

Memory consumption vs. model quality:
  • X-axis: Peak memory (MB)
  • Y-axis: Model accuracy (R²)
  • Color: Memory limit setting
plt.scatter(
    experiment_df['peak_memory_mb'],
    experiment_df['model_accuracy_r2'],
    c=experiment_df['memory_limit_mb'],
    cmap='plasma',
)

latency_memory_accuracy.png

Three-way relationship visualization:
  • X-axis: Peak memory (MB)
  • Y-axis: Preprocessing latency (seconds)
  • Color: Model accuracy (R²)
plt.scatter(
    experiment_df['peak_memory_mb'],
    experiment_df['preprocessing_latency_s'],
    c=experiment_df['model_accuracy_r2'],
    cmap='coolwarm',
)

Analyzing Results

Finding Optimal Configuration

import pandas as pd

# Load experiment results
df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Find best accuracy
best_accuracy = df.loc[df['model_accuracy_r2'].idxmax()]
print("Best accuracy configuration:")
print(f"  Chunk size: {best_accuracy['chunk_size']}")
print(f"  Memory limit: {best_accuracy['memory_limit_mb']} MB")
print(f"  Compute limit: {best_accuracy['compute_limit']}")
print(f"  R²: {best_accuracy['model_accuracy_r2']:.3f}")
print(f"  Latency: {best_accuracy['preprocessing_latency_s']:.3f}s")

# Find lowest latency
lowest_latency = df.loc[df['preprocessing_latency_s'].idxmin()]
print("\nLowest latency configuration:")
print(f"  Chunk size: {lowest_latency['chunk_size']}")
print(f"  Memory limit: {lowest_latency['memory_limit_mb']} MB")
print(f"  Compute limit: {lowest_latency['compute_limit']}")
print(f"  Latency: {lowest_latency['preprocessing_latency_s']:.3f}s")
print(f"  R²: {lowest_latency['model_accuracy_r2']:.3f}")

Pareto Frontier Analysis

Identify configurations that aren’t strictly dominated:
import pandas as pd
import numpy as np

df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Multi-objective: minimize latency, maximize accuracy
df['neg_latency'] = -df['preprocessing_latency_s']  # Convert to maximization

def is_pareto_efficient(costs):
    """Find Pareto-efficient points."""
    is_efficient = np.ones(costs.shape[0], dtype=bool)
    for i, c in enumerate(costs):
        if is_efficient[i]:
            is_efficient[is_efficient] = np.any(costs[is_efficient] > c, axis=1)
            is_efficient[i] = True
    return is_efficient

costs = df[['neg_latency', 'model_accuracy_r2']].values
pareto_mask = is_pareto_efficient(costs)
pareto_df = df[pareto_mask]

print("Pareto-optimal configurations:")
print(pareto_df[['chunk_size', 'memory_limit_mb', 'compute_limit', 
                  'preprocessing_latency_s', 'model_accuracy_r2']])

Memory-Constrained Scenarios

Filter results by memory availability:
import pandas as pd

df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Available memory: 256 MB
low_mem = df[df['memory_limit_mb'] == 256]

# Find best configuration within constraint
best_low_mem = low_mem.loc[low_mem['model_accuracy_r2'].idxmax()]
print(f"Best configuration with 256 MB:")
print(f"  Chunk size: {best_low_mem['chunk_size']}")
print(f"  Compute limit: {best_low_mem['compute_limit']}")
print(f"  R²: {best_low_mem['model_accuracy_r2']:.3f}")
print(f"  Peak usage: {best_low_mem['peak_memory_mb']:.1f} MB")

# Verify memory usage is within limit
if best_low_mem['peak_memory_mb'] > 256:
    print("WARNING: Configuration exceeded memory limit!")

CPU-Constrained Scenarios

import pandas as pd

df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Available compute: 50% (e.g., shared environment)
low_cpu = df[df['compute_limit'] == 0.5]

# Compare latencies
print("Low-CPU configurations:")
print(low_cpu.sort_values('preprocessing_latency_s')[[
    'chunk_size', 'memory_limit_mb', 'preprocessing_latency_s', 'model_accuracy_r2'
]])

Edge Scenarios

Low-Memory Systems

From the hardware profiling documentation: Recommendations:
  • Enable --spill-to-disk
  • Reduce --chunk-size
  • Keep --max-memory-mb realistic for resident process limits
Example:
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir edge_low_mem \
  --chunk-size 32 \
  --max-memory-mb 256 \
  --spill-to-disk \
  --adaptive-chunk-resize
Validation:
import pandas as pd

chunks = pd.read_csv('edge_low_mem/benchmarks/streaming_chunks.csv')
memory_violations = chunks[chunks['memory_after_mb'] > 256]

if len(memory_violations) > 0:
    print(f"Memory exceeded in {len(memory_violations)} chunks")
    print(f"Max usage: {chunks['memory_after_mb'].max():.1f} MB")
else:
    print("All chunks within memory limit")

CPU-Constrained Systems

Recommendations:
  • Lower --max-compute-units
  • Use smaller --batch-size
  • Keep --n-jobs 1 to avoid contention
Example:
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir edge_low_cpu \
  --max-compute-units 0.25 \
  --batch-size 64 \
  --n-jobs 1
Validation:
import json

with open('edge_low_cpu/reports/pipeline_report.json') as f:
    report = json.load(f)

latency = report['streaming']['latency_s']
throughput = report['streaming']['throughput_rows_s']

print(f"Latency: {latency:.3f}s")
print(f"Throughput: {throughput:.1f} rows/s")

Minimal Resource Scenario

Combined memory and CPU constraints:
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir edge_minimal \
  --chunk-size 16 \
  --batch-size 32 \
  --max-memory-mb 128 \
  --max-compute-units 0.25 \
  --spill-to-disk \
  --adaptive-chunk-resize \
  --n-jobs 1

Experiment Summary

The experiment results include a summary dictionary (engine.py:407-412):
'summary': {
    'best_accuracy_r2': float(results_df['model_accuracy_r2'].max()),
    'lowest_latency_s': float(results_df['preprocessing_latency_s'].min()),
    'lowest_training_time_s': float(results_df['training_time_s'].min()),
    'max_peak_memory_mb': float(results_df['peak_memory_mb'].max()),
}
Access via JSON report:
import json

with open('artifacts/reports/pipeline_report.json') as f:
    report = json.load(f)

summary = report['constraint_experiment']['summary']
print(f"Best accuracy achieved: {summary['best_accuracy_r2']:.3f}")
print(f"Lowest latency achieved: {summary['lowest_latency_s']:.3f}s")
print(f"Max memory used: {summary['max_peak_memory_mb']:.1f} MB")

Parallel Execution

Constraint experiments can run in parallel (engine.py:397-402):
config = PipelineConfig(
    n_jobs=4,  # Use 4 parallel workers
    chunk_size=128,
    max_memory_mb=512
)

runner = RealTimePipelineRunner(config)
results = runner.run_constraint_experiment(data)
Note: Parallel execution may interfere with accurate resource measurements. For precise profiling, use n_jobs=1.

Best Practices

  1. Run experiments on representative data: Use production-scale samples
  2. Test edge cases separately: Minimal resource scenarios may need custom grids
  3. Validate constraints: Verify peak usage doesn’t exceed limits
  4. Document findings: Save experiment reports for comparison
  5. Use Pareto analysis: Identify optimal trade-offs, not just best single metric
  6. Consider deployment environment: Match constraints to target hardware

Limitations

  • Fixed parameter grid: Only tests predefined combinations
  • No hyperparameter tuning: Model parameters are fixed
  • Sequential dependencies: Each run is independent (no warm-up effects)
  • Coarse granularity: Limited to 2 values per parameter

Next Steps

Build docs developers (and LLMs) love