Documentation Index
Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt
Use this file to discover all available pages before exploring further.
LeRobot’s processor pipeline system transforms raw robot data through multiple steps. This guide shows you how to debug and inspect data at each stage of the pipeline.
Understanding the Pipeline
A processor pipeline is a sequence of ProcessorStep objects that transform data:
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline(
steps=[
DeviceProcessorStep(device="cuda"),
NormalizerProcessorStep(stats=dataset_stats),
AddBatchDimensionProcessorStep(),
],
name="my_pipeline"
)
# Process data through full pipeline
output = pipeline(input_data)
See lerobot/processor/pipeline.py:254 for the full DataProcessorPipeline implementation.
Step-Through Debugging
Use step_through() to inspect data after each processing step:
from lerobot.processor import create_transition
# Create sample data
transition = create_transition(
observation={"camera": image, "state": joint_positions},
action=action_tensor,
)
# Step through pipeline
for i, intermediate_transition in enumerate(pipeline.step_through(transition)):
print(f"\nAfter step {i}:")
if i == 0:
print(" Initial state")
else:
step = pipeline.steps[i - 1]
print(f" Step: {step.__class__.__name__}")
# Inspect observation
obs = intermediate_transition.get('observation')
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f" {key}: shape={value.shape}, dtype={value.dtype}, device={value.device}")
# Inspect action
action = intermediate_transition.get('action')
if isinstance(action, torch.Tensor):
print(f" action: shape={action.shape}, dtype={action.dtype}, device={action.device}")
Example output:
After step 0:
Initial state
camera: shape=(480, 640, 3), dtype=torch.uint8, device=cpu
state: shape=(7,), dtype=torch.float32, device=cpu
action: shape=(7,), dtype=torch.float32, device=cpu
After step 1:
Step: DeviceProcessorStep
camera: shape=(480, 640, 3), dtype=torch.uint8, device=cuda:0
state: shape=(7,), dtype=torch.float32, device=cuda:0
action: shape=(7,), dtype=torch.float32, device=cuda:0
After step 2:
Step: NormalizerProcessorStep
camera: shape=(480, 640, 3), dtype=torch.float32, device=cuda:0 # Normalized to [0,1]
state: shape=(7,), dtype=torch.float32, device=cuda:0 # Normalized by stats
action: shape=(7,), dtype=torch.float32, device=cuda:0 # Normalized by stats
Custom Hooks
Add hooks to inspect or modify data at specific pipeline stages:
def before_step_hook(step_idx: int, transition: dict):
"""Called before each step"""
step_name = pipeline.steps[step_idx].__class__.__name__
print(f"\n→ Before {step_name}")
# Check for NaN values
for key, value in transition.items():
if isinstance(value, dict):
for k, v in value.items():
if isinstance(v, torch.Tensor) and torch.isnan(v).any():
print(f" ⚠️ NaN detected in {key}.{k}")
def after_step_hook(step_idx: int, transition: dict):
"""Called after each step"""
step_name = pipeline.steps[step_idx].__class__.__name__
print(f"← After {step_name}")
# Log statistics
action = transition.get('action')
if isinstance(action, torch.Tensor):
print(f" Action stats: mean={action.mean():.4f}, std={action.std():.4f}")
# Create pipeline with hooks
pipeline = DataProcessorPipeline(
steps=[...],
before_step_hooks=[before_step_hook],
after_step_hooks=[after_step_hook]
)
Track how data shapes and types change through the pipeline:
from lerobot.configs.types import FeatureType, PolicyFeature, PipelineFeatureType
# Define input features
input_features = {
PipelineFeatureType.OBSERVATION: {
"observation.images.camera": PolicyFeature(type=FeatureType.IMAGE, shape=(3, 224, 224)),
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,)),
},
PipelineFeatureType.ACTION: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
},
}
# Transform through each step
current_features = input_features
for i, step in enumerate(pipeline.steps):
print(f"\nStep {i}: {step.__class__.__name__}")
# Get transformed features
current_features = step.transform_features(current_features)
# Print feature descriptions
for feature_type, features in current_features.items():
print(f" {feature_type}:")
for name, feature in features.items():
print(f" {name}: shape={feature.shape}, type={feature.type}")
Example output:
Step 0: TokenizerProcessorStep
OBSERVATION:
observation.language_tokens: shape=(77,), type=TEXT
observation.language_attention_mask: shape=(77,), type=TEXT
observation.images.camera: shape=(3, 224, 224), type=IMAGE
observation.state: shape=(7,), type=STATE
ACTION:
action: shape=(7,), type=ACTION
Step 1: NormalizerProcessorStep
OBSERVATION:
observation.language_tokens: shape=(77,), type=TEXT
observation.language_attention_mask: shape=(77,), type=TEXT
observation.images.camera: shape=(3, 224, 224), type=IMAGE
observation.state: shape=(7,), type=STATE # Normalized
ACTION:
action: shape=(7,), type=ACTION # Normalized
Debugging Normalization
Inspect normalization statistics:
from lerobot.processor import NormalizerProcessorStep
normalizer = None
for step in pipeline.steps:
if isinstance(step, NormalizerProcessorStep):
normalizer = step
break
if normalizer:
print("Normalization stats:")
for key, stats in normalizer.stats.items():
print(f"\n{key}:")
if 'mean' in stats:
print(f" mean: {stats['mean']}")
print(f" std: {stats['std']}")
if 'min' in stats:
print(f" min: {stats['min']}")
print(f" max: {stats['max']}")
# Check tensor stats (on device)
print("\nTensor stats:")
for key, tensor_stats in normalizer._tensor_stats.items():
print(f"\n{key}:")
for stat_name, tensor in tensor_stats.items():
print(f" {stat_name}: device={tensor.device}, dtype={tensor.dtype}")
Debugging Device Placement
Verify tensors are on correct devices:
def check_device_placement(transition: dict, expected_device: str = "cuda"):
"""Recursively check all tensors are on expected device"""
issues = []
def check_recursive(obj, path=""):
if isinstance(obj, torch.Tensor):
if obj.device.type != expected_device:
issues.append(f"{path}: expected {expected_device}, got {obj.device}")
elif isinstance(obj, dict):
for key, value in obj.items():
check_recursive(value, f"{path}.{key}" if path else key)
check_recursive(transition)
if issues:
print("❌ Device placement issues:")
for issue in issues:
print(f" {issue}")
else:
print(f"✅ All tensors on {expected_device}")
# Use in pipeline
for i, transition in enumerate(pipeline.step_through(data)):
if i > 0 and isinstance(pipeline.steps[i-1], DeviceProcessorStep):
check_device_placement(transition, pipeline.steps[i-1].device)
Saving Pipeline State
Save pipeline configuration and state for debugging:
from pathlib import Path
# Save pipeline
pipeline.save_pretrained("debug_pipeline")
# Check saved files
save_dir = Path("debug_pipeline")
print("Saved files:")
for file in save_dir.iterdir():
print(f" {file.name}")
if file.suffix == ".json":
import json
with open(file) as f:
config = json.load(f)
print(json.dumps(config, indent=2))
Common Issues and Solutions
Issue: NaN Values After Normalization
Cause: Division by zero when std is 0
Solution: Check normalization stats:
for key, stats in normalizer.stats.items():
if 'std' in stats:
std = torch.tensor(stats['std'])
if (std == 0).any():
print(f"⚠️ Zero std in {key}: {std}")
print(" This will cause NaN values!")
Issue: Device Mismatch Errors
Cause: Tensors on different devices
Solution: Add device processor early in pipeline:
pipeline = DataProcessorPipeline(
steps=[
DeviceProcessorStep(device="cuda"), # First step!
# ... other steps
]
)
Issue: Shape Mismatches
Cause: Step expecting different input shape
Solution: Use transform_features() to verify:
current_features = input_features
for step in pipeline.steps:
print(f"Before {step.__class__.__name__}:")
print(f" Features: {current_features}")
try:
current_features = step.transform_features(current_features)
except Exception as e:
print(f"❌ Error in {step.__class__.__name__}: {e}")
break
Debugging Custom Processors
Add debug output to custom processor steps:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
from dataclasses import dataclass
@ProcessorStepRegistry.register("debug_processor")
@dataclass
class DebugProcessorStep(ProcessorStep):
verbose: bool = True
def __call__(self, transition):
if self.verbose:
print(f"\n=== {self.__class__.__name__} ===")
print(f"Input keys: {transition.keys()}")
for key, value in transition.items():
if isinstance(value, torch.Tensor):
print(f"{key}: {value.shape}, {value.dtype}, {value.device}")
elif isinstance(value, dict):
print(f"{key}: dict with {len(value)} items")
for k, v in value.items():
if isinstance(v, torch.Tensor):
print(f" {k}: {v.shape}, {v.dtype}, {v.device}")
return transition
def transform_features(self, features):
return features # Pass through
# Use in pipeline
pipeline = DataProcessorPipeline(
steps=[
DebugProcessorStep(verbose=True),
DeviceProcessorStep(device="cuda"),
DebugProcessorStep(verbose=True),
NormalizerProcessorStep(stats=stats),
DebugProcessorStep(verbose=True),
]
)
Measure time spent in each step:
import time
timings = []
def timing_hook(step_idx: int, transition: dict):
if not hasattr(timing_hook, 'start_time'):
timing_hook.start_time = time.perf_counter()
else:
elapsed = time.perf_counter() - timing_hook.start_time
step_name = pipeline.steps[step_idx].__class__.__name__
timings.append((step_name, elapsed))
timing_hook.start_time = time.perf_counter()
pipeline = DataProcessorPipeline(
steps=[...],
after_step_hooks=[timing_hook]
)
# Run pipeline
output = pipeline(data)
# Print timings
print("\nPipeline timings:")
total = sum(t for _, t in timings)
for step_name, elapsed in timings:
pct = 100 * elapsed / total
print(f"{step_name:30s}: {elapsed*1000:6.2f}ms ({pct:5.1f}%)")
print(f"{'Total':30s}: {total*1000:6.2f}ms")
API Reference
DataProcessorPipeline
See lerobot/processor/pipeline.py:254
step_through
(data) -> Iterable[EnvTransition]
Yield intermediate states after each processing step
Process data through full pipeline
Save pipeline configuration and state
ProcessorStep
See lerobot/processor/pipeline.py:143
Transform feature descriptions without processing actual data
Return processor state (e.g., normalization stats)
Load processor state from dictionary