Documentation Index
Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The HardwareMonitor class provides hardware telemetry with graceful fallback when psutil is unavailable. Supports RAPL energy measurements on Linux systems with Intel CPUs.
Class Definition
class HardwareMonitor:
def __init__(self) -> None
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/hardware/monitor.py:16
Automatically detects available hardware monitoring capabilities on initialization:
- Attempts to import
psutil for CPU and memory monitoring
- Discovers RAPL energy interface at
/sys/class/powercap/intel-rapl*/energy_uj
- Falls back gracefully if either is unavailable
Methods
snapshot
def snapshot(self) -> TelemetrySnapshot
Captures current hardware state.
Dataclass containing:
cpu_percent (float): CPU usage percentage (0.0 if psutil unavailable)
process_memory_mb (float): Process memory in MB (0.0 if psutil unavailable)
system_memory_percent (float): System memory usage percentage (0.0 if psutil unavailable)
energy_uj (float | None): RAPL energy counter in microjoules (None if unavailable)
Example:
from pipeline.hardware import HardwareMonitor
import time
monitor = HardwareMonitor()
# Capture snapshot
snapshot = monitor.snapshot()
print(f"CPU usage: {snapshot.cpu_percent:.1f}%")
print(f"Process memory: {snapshot.process_memory_mb:.2f} MB")
print(f"System memory: {snapshot.system_memory_percent:.1f}%")
if snapshot.energy_uj is not None:
print(f"Energy counter: {snapshot.energy_uj:,.0f} μJ")
else:
print("Energy monitoring not available")
process_memory_mb
def process_memory_mb(self) -> float
Convenience method to get current process memory usage.
Process memory in megabytes (0.0 if psutil unavailable)
Example:
from pipeline.hardware import HardwareMonitor
monitor = HardwareMonitor()
mem_before = monitor.process_memory_mb()
print(f"Memory before: {mem_before:.2f} MB")
# Do some memory-intensive work
data = [i**2 for i in range(10_000_000)]
mem_after = monitor.process_memory_mb()
print(f"Memory after: {mem_after:.2f} MB")
print(f"Memory increase: {mem_after - mem_before:.2f} MB")
compare
def compare(
self,
start: TelemetrySnapshot,
end: TelemetrySnapshot
) -> dict[str, Any]
Compares two snapshots to compute deltas and energy consumption.
start
TelemetrySnapshot
required
Snapshot captured at start of operation
end
TelemetrySnapshot
required
Snapshot captured at end of operation
Dictionary containing:
cpu_percent_start (float): Starting CPU usage
cpu_percent_end (float): Ending CPU usage
process_memory_start_mb (float): Starting process memory
process_memory_end_mb (float): Ending process memory
system_memory_percent_start (float): Starting system memory
system_memory_percent_end (float): Ending system memory
rapl_energy_j (float | None): Energy consumed in joules (None if unavailable or counter wrapped)
Example:
from pipeline.hardware import HardwareMonitor
import time
monitor = HardwareMonitor()
# Benchmark operation
start_snapshot = monitor.snapshot()
start_time = time.perf_counter()
# Simulate work
result = sum(i**2 for i in range(10_000_000))
end_time = time.perf_counter()
end_snapshot = monitor.snapshot()
# Compare snapshots
telemetry = monitor.compare(start_snapshot, end_snapshot)
print(f"Execution time: {end_time - start_time:.3f}s")
print(f"CPU: {telemetry['cpu_percent_start']:.1f}% → {telemetry['cpu_percent_end']:.1f}%")
print(f"Memory: {telemetry['process_memory_start_mb']:.2f} MB → {telemetry['process_memory_end_mb']:.2f} MB")
print(f"Memory delta: {telemetry['process_memory_end_mb'] - telemetry['process_memory_start_mb']:.2f} MB")
if telemetry['rapl_energy_j'] is not None:
print(f"Energy consumed: {telemetry['rapl_energy_j']:.3f} J")
watts = telemetry['rapl_energy_j'] / (end_time - start_time)
print(f"Average power: {watts:.2f} W")
else:
print("Energy data not available")
TelemetrySnapshot
@dataclass
class TelemetrySnapshot:
cpu_percent: float
process_memory_mb: float
system_memory_percent: float
energy_uj: float | None
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/hardware/monitor.py:8
Immutable snapshot of hardware state at a specific point in time.
System-wide CPU usage percentage (0-100)
Current process RSS (Resident Set Size) in megabytes
System-wide memory usage percentage (0-100)
RAPL energy counter value in microjoules. None if RAPL unavailable.
Usage Patterns
from pipeline.hardware import HardwareMonitor
import time
def benchmark_function(func, *args, **kwargs):
monitor = HardwareMonitor()
start_snap = monitor.snapshot()
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
end_snap = monitor.snapshot()
telemetry = monitor.compare(start_snap, end_snap)
elapsed = end_time - start_time
print(f"Function: {func.__name__}")
print(f"Duration: {elapsed:.3f}s")
print(f"Memory: {telemetry['process_memory_end_mb']:.2f} MB")
if telemetry['rapl_energy_j']:
print(f"Energy: {telemetry['rapl_energy_j']:.3f} J")
print(f"Power: {telemetry['rapl_energy_j'] / elapsed:.2f} W")
return result
# Usage
result = benchmark_function(lambda: sum(range(10_000_000)))
Pipeline Integration
from pipeline.config import PipelineConfig
from pipeline.hardware import HardwareMonitor
from pipeline.ingestion import DataIngestor
import time
config = PipelineConfig()
monitor = HardwareMonitor()
ingestor = DataIngestor()
# Monitor data loading
start = monitor.snapshot()
df = ingestor.load('nba_data.csv')
end = monitor.snapshot()
telemetry = monitor.compare(start, end)
print(f"Loaded {len(df)} rows")
print(f"Memory used: {telemetry['process_memory_end_mb'] - telemetry['process_memory_start_mb']:.2f} MB")
Resource-Constrained Execution
from pipeline.hardware import HardwareMonitor
import time
monitor = HardwareMonitor()
MAX_MEMORY_MB = 1024
def process_with_memory_check(data_chunks):
results = []
for i, chunk in enumerate(data_chunks):
# Check memory before processing
current_memory = monitor.process_memory_mb()
if current_memory > MAX_MEMORY_MB:
print(f"⚠ Memory limit exceeded: {current_memory:.2f} MB > {MAX_MEMORY_MB} MB")
print(f"Pausing to allow garbage collection...")
time.sleep(1) # Allow GC
# Recheck
current_memory = monitor.process_memory_mb()
if current_memory > MAX_MEMORY_MB:
print(f"❌ Cannot continue - memory still at {current_memory:.2f} MB")
break
# Process chunk
result = process_chunk(chunk) # Your processing function
results.append(result)
print(f"Chunk {i}: {current_memory:.2f} MB")
return results
Energy Profiling
from pipeline.hardware import HardwareMonitor
from pipeline.streaming.engine import RealTimePipelineRunner
from pipeline.config import PipelineConfig
import pandas as pd
config = PipelineConfig(benchmark_runs=5)
runner = RealTimePipelineRunner(config)
monitor = HardwareMonitor()
# Check if energy monitoring available
test_snap = monitor.snapshot()
if test_snap.energy_uj is None:
print("⚠ RAPL energy monitoring not available on this system")
print("Energy estimates will use fallback calculation")
else:
print("✓ RAPL energy monitoring available")
# Run benchmarks
df = pd.read_csv('nba_data.csv')
benchmark_results = runner.benchmark(df)
# Extract energy data
for i, run in enumerate(benchmark_results['runs']):
batch_energy = run['batch']['energy_estimate_j']
stream_energy = run['streaming']['energy_estimate_j']
print(f"Run {i+1}:")
print(f" Batch energy: {batch_energy:.2f} J")
print(f" Streaming energy: {stream_energy:.2f} J")
print(f" Difference: {stream_energy - batch_energy:+.2f} J")
from pipeline.hardware import HardwareMonitor
import sys
monitor = HardwareMonitor()
snapshot = monitor.snapshot()
print(f"Platform: {sys.platform}")
print(f"Python: {sys.version}")
print()
# Check available features
features = []
if snapshot.cpu_percent > 0 or snapshot.process_memory_mb > 0:
features.append("✓ psutil (CPU/Memory)")
else:
features.append("✗ psutil not available")
if snapshot.energy_uj is not None:
features.append("✓ RAPL energy monitoring")
else:
features.append("✗ RAPL not available (Linux + Intel CPU required)")
print("Hardware monitoring capabilities:")
for feature in features:
print(f" {feature}")
Notes
- psutil dependency: Optional but recommended. Install with
pip install psutil
- RAPL availability: Only on Linux with Intel CPUs and requires read access to
/sys/class/powercap/
- Energy counter wrap: If
energy_uj wraps around (end < start), rapl_energy_j returns None
- CPU percent: Uses
psutil.cpu_percent(interval=None) for instant reading (not averaged)
- Process memory: Reports RSS (Resident Set Size), not virtual memory
- Fallback behavior: Returns 0.0 for all metrics when psutil unavailable, enabling graceful degradation
- Thread safety: Not thread-safe. Create separate instances for concurrent monitoring
- Sampling frequency: Call
snapshot() as frequently as needed; no internal rate limiting
- Energy units: RAPL counter is in microjoules (μJ), converted to joules (J) in
compare()