Documentation Index
Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The pipeline generates detailed profiling artifacts that break down performance at the operator level, enabling fine-grained optimization and bottleneck identification.
Generated Profiling Artifacts
After running run_all(), the pipeline creates three main profiling outputs:
operator_profile.csv
Per-chunk operator-level timing breakdown located in output_dir/profiles/:
chunk_id,preprocess_s,feature_engineering_s,feature_selection_s,encode_scale_s,estimated_input_bandwidth_mb_s,input_bytes
1,0.012,0.018,0.008,0.007,45.3,2097152
2,0.011,0.019,0.007,0.008,47.1,2097152
3,0.013,0.017,0.008,0.007,43.8,2097152
Column Descriptions:
preprocess_s: Time spent in data cleaning and preprocessing
feature_engineering_s: Time spent building derived features
feature_selection_s: Time spent in multicollinearity detection and feature filtering
encode_scale_s: Time spent in one-hot encoding and scaling
estimated_input_bandwidth_mb_s: Calculated input data bandwidth (MB/s)
input_bytes: Raw input data size for the chunk
streaming_chunks.csv
Chunk-level latency, throughput, and memory observations in output_dir/benchmarks/:
chunk_id,rows,latency_s,throughput_rows_s,memory_before_mb,memory_after_mb,memory_exceeded,retries
1,128,0.045,2844.4,245.3,268.7,false,0
2,128,0.042,3047.6,268.7,289.1,false,0
See Benchmarking for complete field descriptions.
pipeline_report.json
Aggregate telemetry and operator profile summary in output_dir/reports/:
{
"streaming": {
"operator_profile_summary_s": {
"preprocess_s": 0.012,
"feature_engineering_s": 0.018,
"feature_selection_s": 0.008,
"encode_scale_s": 0.007
},
"telemetry": {
"cpu_percent_start": 12.3,
"cpu_percent_end": 45.7,
"process_memory_start_mb": 156.2,
"process_memory_end_mb": 342.8,
"rapl_energy_j": 23.45
}
}
}
Operator-Level Profiling
Implementation
The _profile_stream_chunk() method in engine.py:84-106 measures wall-clock time for each pipeline stage:
def _profile_stream_chunk(self, chunk: pd.DataFrame, rolling_state: Any):
stage_start = time.perf_counter()
cleaned = self.preprocessor.clean(chunk)
preprocess_s = time.perf_counter() - stage_start
stage_start = time.perf_counter()
featured = self.engineer.build_features_streaming(cleaned, rolling_state)
feature_s = time.perf_counter() - stage_start
stage_start = time.perf_counter()
filtered = self.engineer.drop_multicollinearity(featured)
select_s = time.perf_counter() - stage_start
stage_start = time.perf_counter()
x_chunk, y_chunk = self.engineer.encode_and_scale(filtered)
encode_s = time.perf_counter() - stage_start
return x_chunk, y_chunk, {
'preprocess_s': float(preprocess_s),
'feature_engineering_s': float(feature_s),
'feature_selection_s': float(select_s),
'encode_scale_s': float(encode_s),
}
Identifying Bottlenecks
Use the operator profile to identify dominant stages:
import pandas as pd
# Load operator profile
profile = pd.read_csv('artifacts/profiles/operator_profile.csv')
# Calculate mean time per operator
operator_means = profile[[
'preprocess_s',
'feature_engineering_s',
'feature_selection_s',
'encode_scale_s'
]].mean()
print("Operator breakdown:")
print(operator_means.sort_values(ascending=False))
Example output:
feature_engineering_s 0.0182
preprocess_s 0.0121
feature_selection_s 0.0078
encode_scale_s 0.0071
In this case, feature engineering is the bottleneck and should be the focus of optimization efforts.
Hardware Telemetry
HardwareMonitor Class
The HardwareMonitor class (monitor.py:16-76) provides fallback-safe hardware telemetry:
from pipeline.hardware import HardwareMonitor
monitor = HardwareMonitor()
# Capture snapshot before processing
start = monitor.snapshot()
# ... run pipeline ...
# Capture snapshot after processing
end = monitor.snapshot()
# Compare snapshots
telemetry = monitor.compare(start, end)
TelemetrySnapshot Structure
From monitor.py:8-13:
@dataclass
class TelemetrySnapshot:
cpu_percent: float
process_memory_mb: float
system_memory_percent: float
energy_uj: float | None # RAPL energy in microjoules
RAPL Energy Measurement
On Linux systems with Intel RAPL support, the monitor reads energy counters from:
/sys/class/powercap/intel-rapl*/energy_uj
Implementation in monitor.py:30-45:
def _discover_rapl_path(self) -> Path | None:
base = Path('/sys/class/powercap')
if not base.exists():
return None
for cand in base.glob('intel-rapl*/energy_uj'):
if cand.is_file():
return cand
return None
def _read_rapl_energy_uj(self) -> float | None:
if self._rapl_path is None:
return None
try:
return float(self._rapl_path.read_text(encoding='utf-8').strip())
except Exception:
return None
Energy calculation (engine.py:186, 299):
telemetry = self.hardware.compare(start_snapshot, end_snapshot)
# Fallback if RAPL unavailable
telemetry['fallback_energy_estimate_j'] = elapsed * 45.0 # batch
telemetry['fallback_energy_estimate_j'] = elapsed * 30.0 # streaming
energy = telemetry['rapl_energy_j'] if telemetry['rapl_energy_j'] is not None else telemetry['fallback_energy_estimate_j']
Assumptions:
- Batch mode: 45W average power
- Streaming mode: 30W average power
Memory Hierarchy and Cache Effects
The project does not capture PMU (Performance Monitoring Unit) counters directly, but practical signals help identify memory bottlenecks:
Cache Pressure Indicators
-
Increased
encode_scale_s with larger chunks
- One-hot encoding creates sparse matrices that stress cache
- Try reducing chunk size if this stage dominates
-
Rising end-to-end latency with stable throughput
- May indicate memory copy overhead
- Check for unnecessary DataFrame copies
-
Divergence between bandwidth estimate and throughput
- Could indicate storage or serialization bottlenecks
- Monitor
estimated_input_bandwidth_mb_s vs actual I/O speed
Bandwidth Estimation
From engine.py:282-283:
'input_bytes': int(chunk.memory_usage(index=True, deep=True).sum()),
'estimated_input_bandwidth_mb_s': float(
(chunk.memory_usage(index=True, deep=True).sum() / (1024 * 1024)) / max(elapsed, 1e-9)
)
Formula: bandwidth = input_bytes_mb / latency_s
Interpretation:
- High bandwidth (> 1000 MB/s): Good cache utilization
- Low bandwidth (< 100 MB/s): Potential I/O or memory bottleneck
- Decreasing bandwidth over time: Growing memory pressure
Reproducible Profiling Command
To generate profiling artifacts with controlled parameters:
cd "NBA Data Preprocessing/task"
python run_pipeline.py \
--input ../data/nba2k-full.csv \
--output-dir artifacts_profile \
--chunk-size 128 \
--batch-size 256 \
--max-memory-mb 512 \
--max-compute-units 0.5 \
--benchmark-runs 3 \
--random-seed 42
Key parameters:
--chunk-size: Size of streaming chunks (affects cache behavior)
--max-memory-mb: Memory limit for adaptive sizing
--max-compute-units: CPU constraint (0.0-1.0)
--benchmark-runs: Number of profiling iterations
--random-seed: Ensures reproducibility
Using Profiling Data for Optimization
Step 1: Identify Dominant Operator
python -c "import pandas as pd; \
df = pd.read_csv('artifacts/profiles/operator_profile.csv'); \
print(df[['preprocess_s', 'feature_engineering_s', 'feature_selection_s', 'encode_scale_s']].mean())"
Step 2: Analyze Chunk Size Impact
Compare operator times across different chunk sizes:
import pandas as pd
import matplotlib.pyplot as plt
profile = pd.read_csv('artifacts/profiles/operator_profile.csv')
chunks = pd.read_csv('artifacts/benchmarks/streaming_chunks.csv')
# Merge on chunk_id
merged = profile.merge(chunks[['chunk_id', 'chunk_size']], on='chunk_id')
# Plot encode time vs chunk size
merged.plot.scatter(x='chunk_size', y='encode_scale_s')
plt.xlabel('Chunk Size')
plt.ylabel('Encode & Scale Time (s)')
plt.title('Cache Pressure vs Chunk Size')
plt.savefig('cache_analysis.png')
Step 3: Investigate Memory Patterns
chunks = pd.read_csv('artifacts/benchmarks/streaming_chunks.csv')
# Find chunks with memory pressure
memory_issues = chunks[chunks['memory_exceeded'] == True]
print(f"Chunks exceeding memory limit: {len(memory_issues)}")
# Analyze retry patterns
if len(memory_issues) > 0:
print(f"Average retries: {memory_issues['retries'].mean():.2f}")
print(f"Max retries: {memory_issues['retries'].max()}")
Limitations
From the source documentation:
- No GPU profiling: GPU kernels are not currently measured
- User-space timing: All timing is wall-clock time in Python user space
- No quantization path: Direct quantization is not implemented
- Platform-dependent energy: RAPL counters only available on Intel Linux systems
- No PMU counters: Cache misses, branch mispredictions not captured
Next Steps