Documentation Index
Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt
Use this file to discover all available pages before exploring further.
Overview
This guide provides optimization strategies based on profiling data and hardware constraints. Use the hardware profiling outputs to identify bottlenecks, then apply these strategies to improve performance.
Hardware-Adjusted Sizing
The pipeline automatically adjusts chunk and batch sizes based on available resources.
Implementation
From engine.py:42-60:
def _hardware_adjusted_sizes(
self,
rows: int,
chunk_size: int | None = None,
batch_size: int | None = None,
max_memory_mb: int | None = None,
max_compute_units: float | None = None,
) -> tuple[int, int]:
chunk_base = self.config.chunk_size if chunk_size is None else chunk_size
batch_base = self.config.batch_size if batch_size is None else batch_size
memory_cap = self.config.max_memory_mb if max_memory_mb is None else max_memory_mb
compute_cap = self.config.max_compute_units if max_compute_units is None else max_compute_units
memory_factor = max(0.1, min(1.0, memory_cap / 1024))
compute_factor = max(0.1, min(1.0, compute_cap))
scale = memory_factor * compute_factor
adjusted_batch = max(16, int(batch_base * scale))
adjusted_chunk = max(16, int(chunk_base * scale))
return min(adjusted_batch, rows), min(adjusted_chunk, rows)
Memory factor: min(1.0, max_memory_mb / 1024)
- 512 MB → factor = 0.5
- 1024 MB → factor = 1.0
- 2048 MB → factor = 1.0 (capped)
Compute factor: max_compute_units (0.0 to 1.0)
- 0.5 → factor = 0.5 (half available cores)
- 1.0 → factor = 1.0 (all cores)
Combined scale: memory_factor × compute_factor
Adjusted sizes:
chunk_size = max(16, base_chunk_size × scale)
batch_size = max(16, base_batch_size × scale)
Example
With chunk_size=128, max_memory_mb=512, max_compute_units=0.5:
memory_factor = 512 / 1024 = 0.5
compute_factor = 0.5
scale = 0.5 × 0.5 = 0.25
adjusted_chunk = max(16, 128 × 0.25) = max(16, 32) = 32
The chunk size is reduced from 128 to 32 rows to fit memory constraints.
Optimization by Scenario
Low-Memory Systems
Indicators:
- Frequent
memory_exceeded=true in streaming_chunks.csv
- High
retries count
- Process killed by OOM (Out of Memory)
Strategies:
-
Enable disk spilling:
config = PipelineConfig(spill_to_disk=True)
From engine.py:260-266, this saves intermediate results to disk:
if self.config.spill_to_disk:
x_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_X.csv'
y_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_y.csv'
X_chunk.to_csv(x_path, index=False)
y_chunk.to_frame('salary').to_csv(y_path, index=False)
-
Reduce chunk size:
config = PipelineConfig(
chunk_size=64, # Smaller chunks
max_memory_mb=256 # Realistic limit
)
-
Enable adaptive chunk resizing:
config = PipelineConfig(
adaptive_chunk_resize=True,
max_chunk_retries=3
)
From engine.py:251-258, chunks are automatically split when memory is exceeded:
if memory_exceeded and self.config.adaptive_chunk_resize and retries < self.config.max_chunk_retries:
retries += 1
split = max(16, len(chunk) // 2)
pending_chunks.insert(0, chunk.iloc[split:].copy())
chunk = chunk.iloc[:split].copy()
current_chunk_size = split
continue
-
Use streaming mode exclusively:
runner = RealTimePipelineRunner(config)
result = runner.run_streaming(data, max_memory_mb=256)
Example configuration:
python run_pipeline.py \
--chunk-size 64 \
--max-memory-mb 256 \
--spill-to-disk \
--adaptive-chunk-resize \
--max-chunk-retries 3
CPU-Constrained Systems
Indicators:
- High
cpu_percent in telemetry
- Low throughput despite adequate memory
- Long
feature_engineering_s or encode_scale_s times
Strategies:
-
Reduce compute allocation:
config = PipelineConfig(
max_compute_units=0.5, # Use 50% of cores
n_jobs=1 # Disable parallelism
)
-
Use smaller batch sizes:
config = PipelineConfig(
batch_size=128 # Reduce from default 256
)
-
Disable parallel processing:
config = PipelineConfig(n_jobs=1)
From engine.py:397-402, the constraint experiment uses this setting:
if self.config.n_jobs > 1:
experiment_rows = Parallel(n_jobs=self.config.n_jobs)(
delayed(self._single_constraint_run)(source, c, m, cp) for c, m, cp in tasks
)
else:
experiment_rows = [self._single_constraint_run(source, c, m, cp) for c, m, cp in tasks]
Example configuration:
python run_pipeline.py \
--max-compute-units 0.5 \
--batch-size 128 \
--n-jobs 1
Encode Stage Bottleneck
Indicators:
encode_scale_s dominates in operator_profile.csv
- Time increases non-linearly with chunk size
Strategies:
-
Reduce chunk size to improve cache locality:
config = PipelineConfig(chunk_size=64)
-
Profile chunk size impact:
import pandas as pd
profile = pd.read_csv('artifacts/profiles/operator_profile.csv')
# Find optimal chunk size
for size in [32, 64, 128, 256]:
runner = RealTimePipelineRunner(PipelineConfig(chunk_size=size))
result = runner.run_streaming(data, chunk_size=size)
print(f"Chunk size {size}: {result['latency_s']:.3f}s")
-
Monitor cache pressure:
Feature Engineering Bottleneck
Indicators:
feature_engineering_s dominates in operator_profile.csv
- High CPU usage during this stage
Strategies:
-
Use simpler rolling aggregations:
- The
build_features_streaming() method computes rolling statistics
- Consider reducing the window size or number of features
-
Pre-compute features offline:
- For batch processing, compute features once and cache
-
Parallelize feature computation:
- Increase
n_jobs if memory allows
I/O Bottleneck
Indicators:
- Low
estimated_input_bandwidth_mb_s (< 100 MB/s)
- High latency despite low CPU and memory usage
- Divergence between bandwidth estimate and throughput
Strategies:
-
Increase chunk size to amortize I/O:
config = PipelineConfig(chunk_size=256)
-
Use faster storage:
- SSD instead of HDD
- Local storage instead of network drives
-
Pre-load data into memory:
import pandas as pd
data = pd.read_csv('nba2k-full.csv') # Load once
runner.run_streaming(data) # Pass DataFrame
Multi-Objective Optimization
Latency vs. Accuracy
From the benchmark visualizations (engine.py:512-525):
plt.scatter(
experiment_df['preprocessing_latency_s'],
experiment_df['model_accuracy_r2'],
c=experiment_df['compute_limit'],
cmap='viridis',
)
plt.xlabel('Preprocessing latency (s)')
plt.ylabel('Model accuracy (R²)')
plt.title('Latency vs Accuracy')
Trade-offs:
- Smaller chunks → faster iteration but potential accuracy loss
- Larger chunks → better feature context but higher latency
Finding optimal point:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner
results = []
for chunk_size in [32, 64, 128, 256]:
config = PipelineConfig(chunk_size=chunk_size)
runner = RealTimePipelineRunner(config)
r = runner.run_streaming(data, chunk_size=chunk_size)
results.append({
'chunk_size': chunk_size,
'latency': r['latency_s'],
'r2': r['model']['r2']
})
import pandas as pd
df = pd.DataFrame(results)
print(df)
Memory vs. Accuracy
From the benchmark visualizations (engine.py:527-540):
plt.scatter(
experiment_df['peak_memory_mb'],
experiment_df['model_accuracy_r2'],
c=experiment_df['memory_limit_mb'],
cmap='plasma',
)
plt.xlabel('Peak memory (MB)')
plt.ylabel('Model accuracy (R²)')
Trade-offs:
- Lower memory limits require smaller chunks
- May reduce model quality for streaming models
- Batch models maintain accuracy but can’t run
Strategy: Use the constraint experiment to find the Pareto frontier.
Tuning Workflow
Step 1: Establish Baseline
python run_pipeline.py \
--input ../data/nba2k-full.csv \
--output-dir baseline \
--benchmark-runs 3
Step 2: Identify Bottleneck
import pandas as pd
profile = pd.read_csv('baseline/profiles/operator_profile.csv')
means = profile[['preprocess_s', 'feature_engineering_s',
'feature_selection_s', 'encode_scale_s']].mean()
print("Bottleneck:", means.idxmax())
Step 3: Apply Targeted Optimization
Based on bottleneck:
- preprocess_s: Reduce chunk size or optimize cleaning logic
- feature_engineering_s: Simplify features or increase parallelism
- feature_selection_s: Reduce correlation threshold
- encode_scale_s: Reduce chunk size (cache pressure)
Step 4: Validate Improvement
python run_pipeline.py \
--input ../data/nba2k-full.csv \
--output-dir optimized \
--benchmark-runs 3 \
--chunk-size 64 # example optimization
Step 5: Compare Results
import pandas as pd
import json
with open('baseline/reports/pipeline_report.json') as f:
baseline = json.load(f)
with open('optimized/reports/pipeline_report.json') as f:
optimized = json.load(f)
print(f"Baseline latency: {baseline['streaming']['latency_s']:.3f}s")
print(f"Optimized latency: {optimized['streaming']['latency_s']:.3f}s")
print(f"Speedup: {baseline['streaming']['latency_s'] / optimized['streaming']['latency_s']:.2f}x")
Advanced Techniques
Dynamic Chunk Sizing
The adaptive chunk resize feature (engine.py:251-258) automatically reduces chunk size when memory is exceeded:
config = PipelineConfig(
adaptive_chunk_resize=True,
max_chunk_retries=3,
max_memory_mb=512
)
How it works:
- Chunk exceeds memory limit
- Split chunk in half
- Process first half
- Add second half back to queue
- Retry up to
max_chunk_retries times
Backoff strategy (engine.py:257):
time.sleep(min(0.05 * retries, 0.2)) # 50ms, 100ms, 150ms, max 200ms
Online Learning for Streaming
The streaming mode uses SGDRegressor for incremental learning (engine.py:214):
online_model = SGDRegressor(
random_state=self.config.random_seed,
max_iter=1,
tol=None,
learning_rate='invscaling'
)
Each chunk updates the model via partial_fit() (engine.py:245):
if len(X_chunk) > 0:
online_model.partial_fit(X_chunk.to_numpy(dtype=float), y_chunk.to_numpy(dtype=float))
Optimization: Adjust batch size for model updates based on convergence requirements.
Monitoring in Production
Key Metrics
-
Per-chunk latency: Should remain stable
chunks = pd.read_csv('artifacts/benchmarks/streaming_chunks.csv')
print(f"Mean latency: {chunks['latency_s'].mean():.3f}s")
print(f"Std latency: {chunks['latency_s'].std():.3f}s")
print(f"P95 latency: {chunks['latency_s'].quantile(0.95):.3f}s")
-
Memory stability: No increasing trend
chunks['memory_delta'] = chunks['memory_after_mb'] - chunks['memory_before_mb']
if chunks['memory_delta'].mean() > 10:
print("WARNING: Potential memory leak")
-
Retry rate: Should be near zero
retry_rate = (chunks['retries'] > 0).mean()
print(f"Retry rate: {retry_rate*100:.1f}%")
Alerting Thresholds
- Latency P95 > 2× median: Performance degradation
- Memory exceeded > 10%: Undersized configuration
- Retry rate > 5%: Frequent memory pressure
- Bandwidth < 50 MB/s: I/O bottleneck
Best Practices
- Always profile first: Use operator profiling to guide optimization
- Optimize the bottleneck: Focus on the dominant stage
- Test on representative data: Use production-scale samples
- Validate accuracy: Ensure optimizations don’t harm model quality
- Document baselines: Save reports before and after optimization
- Monitor continuously: Track metrics over time in production
Next Steps