Documentation Index
Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt
Use this file to discover all available pages before exploring further.
LeRobot’s processor system is designed to be extensible. This guide shows you how to create custom processing steps for your specific robot or task.
Processor Step Basics
All processor steps inherit from ProcessorStep and must implement two methods:
__call__(transition): Transform the data
transform_features(features): Describe how features change
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
from lerobot.processor.core import EnvTransition
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from dataclasses import dataclass
@ProcessorStepRegistry.register("my_custom_processor")
@dataclass
class MyCustomProcessorStep(ProcessorStep):
# Configuration parameters
scale_factor: float = 1.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Process the transition data"""
# Your processing logic here
return transition
def transform_features(
self,
features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""Describe how features are transformed"""
# Return modified feature descriptions
return features
See lerobot/processor/pipeline.py:143 for the base ProcessorStep class.
Specialized Processor Steps
LeRobot provides specialized base classes for common use cases:
ObservationProcessorStep
Process only observations:
from lerobot.processor import ObservationProcessorStep
@ProcessorStepRegistry.register("my_obs_processor")
@dataclass
class MyObsProcessorStep(ObservationProcessorStep):
def observation(self, observation: dict) -> dict:
"""Process observation data"""
processed = observation.copy()
# Example: Add a new observation field
if "observation.state" in processed:
state = processed["observation.state"]
# Add velocity by finite difference
processed["observation.velocity"] = compute_velocity(state)
return processed
def transform_features(self, features):
# Add velocity feature
if FeatureType.STATE in features:
features[FeatureType.STATE]["observation.velocity"] = PolicyFeature(
type=FeatureType.STATE,
shape=(7,) # Same dim as state
)
return features
ActionProcessorStep
Process only actions:
from lerobot.processor import ActionProcessorStep
@ProcessorStepRegistry.register("my_action_processor")
@dataclass
class MyActionProcessorStep(ActionProcessorStep):
def action(self, action: torch.Tensor) -> torch.Tensor:
"""Process action tensor"""
# Example: Clip actions to safe range
return torch.clamp(action, -1.0, 1.0)
def transform_features(self, features):
# Features unchanged (still same shape)
return features
RobotActionProcessorStep
Process robot action dictionaries:
from lerobot.processor import RobotActionProcessorStep
@ProcessorStepRegistry.register("my_robot_action_processor")
@dataclass
class MyRobotActionProcessorStep(RobotActionProcessorStep):
max_velocity: float = 1.0
def action(self, action: dict[str, float]) -> dict[str, float]:
"""Process robot action dict"""
processed = action.copy()
# Example: Limit joint velocities
for key in action.keys():
if key.endswith(".vel"):
processed[key] = np.clip(action[key], -self.max_velocity, self.max_velocity)
return processed
def transform_features(self, features):
return features
Example: Image Preprocessing Step
Create a step that crops and resizes images:
import torch
import torchvision.transforms.functional as TF
from lerobot.processor import ObservationProcessorStep
from lerobot.utils.constants import OBS_IMAGES
@ProcessorStepRegistry.register("crop_resize_processor")
@dataclass
class CropResizeProcessorStep(ObservationProcessorStep):
crop_box: tuple[int, int, int, int] # (x, y, width, height)
output_size: tuple[int, int] # (height, width)
def observation(self, observation: dict) -> dict:
processed = observation.copy()
# Process all image observations
for key in list(processed.keys()):
if key.startswith(f"{OBS_IMAGES}."):
img = processed[key] # Shape: (B, C, H, W)
# Crop
x, y, w, h = self.crop_box
img = img[:, :, y:y+h, x:x+w]
# Resize
img = TF.resize(img, self.output_size)
processed[key] = img
return processed
def transform_features(self, features):
# Update image feature shapes
new_features = features.copy()
for feature_type, feats in features.items():
if feature_type == FeatureType.IMAGE:
for name, feature in feats.items():
if name.startswith(f"{OBS_IMAGES}."):
# Update shape to output size
new_features[feature_type][name] = PolicyFeature(
type=FeatureType.IMAGE,
shape=(3, *self.output_size) # (C, H, W)
)
return new_features
# Usage
processor = CropResizeProcessorStep(
crop_box=(100, 100, 400, 400),
output_size=(224, 224)
)
Example: Delta Action Converter
Convert absolute actions to delta (relative) actions:
from lerobot.processor import ActionProcessorStep
import torch
@ProcessorStepRegistry.register("absolute_to_delta_action")
@dataclass
class AbsoluteToDeltaActionStep(ActionProcessorStep):
"""Convert absolute actions to delta actions."""
_prev_action: torch.Tensor | None = None
def action(self, action: torch.Tensor) -> torch.Tensor:
"""Convert absolute action to delta."""
if self._prev_action is None:
# First action: delta is just the action itself
delta = action.clone()
else:
# Compute delta from previous action
delta = action - self._prev_action
# Store for next call
self._prev_action = action.clone()
return delta
def reset(self):
"""Reset internal state."""
self._prev_action = None
def transform_features(self, features):
# Features unchanged (still same shape)
return features
# Note: Call reset() at episode boundaries
processor = AbsoluteToDeltaActionStep()
for episode in episodes:
processor.reset()
for transition in episode:
processed = processor(transition)
Stateful Processors
Processors can maintain state using state_dict() and load_state_dict():
from lerobot.processor import ProcessorStep
import torch
@ProcessorStepRegistry.register("running_mean_processor")
@dataclass
class RunningMeanProcessorStep(ProcessorStep):
"""Maintain running mean of observations."""
# State
_count: int = 0
_mean: torch.Tensor | None = None
def __call__(self, transition):
obs = transition["observation"]
# Update running mean
if "observation.state" in obs:
state = obs["observation.state"]
if self._mean is None:
self._mean = state.clone()
self._count = 1
else:
self._count += 1
self._mean = self._mean + (state - self._mean) / self._count
# Subtract running mean
obs["observation.state"] = state - self._mean
return transition
def state_dict(self) -> dict[str, torch.Tensor]:
"""Save state."""
if self._mean is None:
return {}
return {
"mean": self._mean,
"count": torch.tensor(self._count),
}
def load_state_dict(self, state: dict[str, torch.Tensor]):
"""Load state."""
if "mean" in state:
self._mean = state["mean"]
self._count = int(state["count"].item())
def transform_features(self, features):
return features
Accessing Other Transition Components
Processors can access other parts of the transition via self.transition:
from lerobot.processor import RobotActionProcessorStep
from lerobot.processor.core import TransitionKey
@ProcessorStepRegistry.register("action_based_on_obs")
@dataclass
class ActionBasedOnObsStep(RobotActionProcessorStep):
"""Modify action based on observation."""
def action(self, action: dict) -> dict:
# Access observation from current transition
observation = self.transition.get(TransitionKey.OBSERVATION)
if observation and "force_sensor" in observation:
force = observation["force_sensor"]
# Reduce action if force is too high
if force > 10.0:
scale = 0.5
action = {k: v * scale for k, v in action.items()}
return action
def transform_features(self, features):
return features
See lerobot/processor/pipeline.py:156 for the transition property.
Configuration Parameters
Use dataclass fields for configuration:
from dataclasses import dataclass, field
@ProcessorStepRegistry.register("configurable_processor")
@dataclass
class ConfigurableProcessorStep(ProcessorStep):
# Required parameter
scale: float
# Optional with default
offset: float = 0.0
# Complex default (use field)
feature_names: list[str] = field(default_factory=lambda: ["state"])
# Internal state (not serialized)
_cache: dict = field(default_factory=dict, init=False, repr=False)
def __call__(self, transition):
# Use configuration
for name in self.feature_names:
if name in transition["observation"]:
transition["observation"][name] = (
transition["observation"][name] * self.scale + self.offset
)
return transition
def get_config(self) -> dict:
"""Return serializable config."""
return {
"scale": self.scale,
"offset": self.offset,
"feature_names": self.feature_names,
}
def transform_features(self, features):
return features
Registration and Discovery
Register your processor to make it discoverable:
from lerobot.processor import ProcessorStepRegistry
# Option 1: Decorator
@ProcessorStepRegistry.register("my_processor")
class MyProcessorStep(ProcessorStep):
...
# Option 2: Explicit registration
class MyProcessorStep(ProcessorStep):
...
ProcessorStepRegistry.register("my_processor")(MyProcessorStep)
# Discovery
all_processors = ProcessorStepRegistry.list()
print(all_processors) # [..., 'my_processor', ...]
# Instantiation by name
processor_class = ProcessorStepRegistry.get("my_processor")
processor = processor_class(scale=2.0)
Testing Custom Processors
Always test your processors thoroughly:
import pytest
import torch
from lerobot.processor import create_transition
def test_my_processor():
# Create test data
transition = create_transition(
observation={"observation.state": torch.randn(7)},
action=torch.randn(7),
)
# Create processor
processor = MyProcessorStep(scale=2.0)
# Process
output = processor(transition)
# Verify
assert "observation.state" in output["observation"]
# Check transform_features
from lerobot.configs.types import FeatureType, PolicyFeature, PipelineFeatureType
input_features = {
PipelineFeatureType.OBSERVATION: {
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,))
}
}
output_features = processor.transform_features(input_features)
assert "observation.state" in output_features[PipelineFeatureType.OBSERVATION]
def test_stateful_processor():
processor = RunningMeanProcessorStep()
# Process multiple transitions
for i in range(10):
transition = create_transition(
observation={"observation.state": torch.randn(7)}
)
processor(transition)
# Save state
state = processor.state_dict()
assert "mean" in state
# Create new processor and load state
new_processor = RunningMeanProcessorStep()
new_processor.load_state_dict(state)
# Verify state transferred
assert torch.allclose(processor._mean, new_processor._mean)
Integration with Pipelines
Use your custom processor in a pipeline:
from lerobot.processor import DataProcessorPipeline, DeviceProcessorStep
pipeline = DataProcessorPipeline(
steps=[
MyCustomProcessorStep(scale=2.0),
DeviceProcessorStep(device="cuda"),
# ... other steps
],
name="custom_pipeline"
)
# Save pipeline with custom processor
pipeline.save_pretrained("my_pipeline")
# Load pipeline (requires custom processor to be registered)
loaded = DataProcessorPipeline.from_pretrained("my_pipeline")
Best Practices
1. Keep Processors Simple
Each processor should do one thing well:
# Good: Single responsibility
@ProcessorStepRegistry.register("crop_image")
class CropImageStep(ObservationProcessorStep):
def observation(self, obs):
# Only crop
...
# Bad: Multiple responsibilities
@ProcessorStepRegistry.register("process_all")
class ProcessAllStep(ProcessorStep):
def __call__(self, transition):
# Crop, resize, normalize, augment...
...
2. Handle Edge Cases
def observation(self, observation):
processed = observation.copy()
# Check if expected fields exist
if "observation.state" not in processed:
return processed # Skip processing
state = processed["observation.state"]
# Check tensor properties
if state.ndim != 1:
raise ValueError(f"Expected 1D state, got {state.ndim}D")
# Handle empty batches
if state.shape[0] == 0:
return processed
# Your processing...
return processed
3. Document Behavior
@ProcessorStepRegistry.register("my_processor")
@dataclass
class MyProcessorStep(ObservationProcessorStep):
"""Process observations by doing X.
This processor expects:
- observation.state: (N,) tensor of joint positions
- observation.images.camera: (B, C, H, W) image tensor
It produces:
- observation.state: (N,) tensor (unchanged)
- observation.images.camera: (B, C, H', W') resized image
- observation.velocity: (N,) tensor of joint velocities
Args:
output_size: Target image size (H, W)
Example:
>>> processor = MyProcessorStep(output_size=(224, 224))
>>> output = processor(transition)
"""
output_size: tuple[int, int]
API Reference
ProcessorStep
See lerobot/processor/pipeline.py:143
__call__
(transition) -> EnvTransition
Process environment transition
Transform feature descriptions
Return serializable state
Load state from dictionary
Reset internal state (for episodic processing)
ProcessorStepRegistry
See lerobot/processor/pipeline.py:59
Register a processor step class
Get processor class by name
List all registered processor names