Documentation Index
Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt
Use this file to discover all available pages before exploring further.
The Processor API provides modular pipelines for transforming data between robots, policies, and environments.
Overview
LeRobot uses processor pipelines to:
- Normalize/unnormalize data
- Convert between data formats (numpy ↔ torch)
- Apply transformations (e.g., delta actions)
- Move data between devices (CPU/GPU)
- Rename observation keys
- Add batch dimensions
Type Definitions
from lerobot.processor import (
RobotObservation, # Dict of robot sensor data
RobotAction, # Dict of robot commands
PolicyAction, # Tensor or dict from policy
EnvAction, # Numpy array for environment
)
Location: src/lerobot/processor/core.py
Core Pipeline Classes
ProcessorStep
Base class for all processing steps.
from lerobot.processor import ProcessorStep
class ProcessorStep:
def __call__(self, data: Any) -> Any:
"""Transform input data."""
pass
DataProcessorPipeline
Generic pipeline for chaining processing steps.
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline([
NormalizerProcessorStep(stats),
DeviceProcessorStep(device="cuda"),
])
processed_data = pipeline(raw_data)
Built-in Processor Steps
NormalizerProcessorStep
Normalize data using dataset statistics.
from lerobot.processor import NormalizerProcessorStep
normalizer = NormalizerProcessorStep(
stats=dataset.meta.stats,
features={
"observation.state": {...},
"action": {...},
},
)
normalized = normalizer(data)
Dataset statistics with mean and std for each feature.
Feature definitions specifying which keys to normalize.
UnnormalizerProcessorStep
Reverse normalization.
from lerobot.processor import UnnormalizerProcessorStep
unnormalizer = UnnormalizerProcessorStep(
stats=dataset.meta.stats,
features={"action": {...}},
)
action = unnormalizer(normalized_action)
DeviceProcessorStep
Move tensors to specific device.
from lerobot.processor import DeviceProcessorStep
to_cuda = DeviceProcessorStep(device="cuda")
data_on_gpu = to_cuda(data_on_cpu)
Device string: "cpu", "cuda", "cuda:0", etc.
MapDeltaActionToRobotActionStep
Convert delta actions to absolute actions.
from lerobot.processor import MapDeltaActionToRobotActionStep
delta_to_absolute = MapDeltaActionToRobotActionStep()
# Input: (delta_action, current_observation)
absolute_action = delta_to_absolute((delta_action, robot_obs))
RenameObservationsProcessorStep
Rename observation keys.
from lerobot.processor import RenameObservationsProcessorStep
renamer = RenameObservationsProcessorStep(
rename_map={
"observation.images.camera1": "observation.images.front",
}
)
renamed_obs = renamer(observation)
Mapping from old keys to new keys.
AddBatchDimensionProcessorStep
Add batch dimension to tensors.
from lerobot.processor import AddBatchDimensionProcessorStep
add_batch = AddBatchDimensionProcessorStep()
# (C, H, W) -> (1, C, H, W)
batched = add_batch(unbatched_data)
Factory Functions
make_default_processors
from lerobot.processor import make_default_processors
robot_obs_processor, robot_action_processor, teleop_action_processor = make_default_processors()
Create default processors for robot data.
Pipeline for processing robot observations.
Pipeline for processing robot actions.
Pipeline for processing teleoperation actions.
make_default_robot_observation_processor
from lerobot.processor import make_default_robot_observation_processor
obs_processor = make_default_robot_observation_processor()
processed_obs = obs_processor(robot.get_observation())
make_default_robot_action_processor
from lerobot.processor import make_default_robot_action_processor
action_processor = make_default_robot_action_processor()
processed_action = action_processor((action, observation))
Specialized Pipelines
PolicyProcessorPipeline
Pipeline for policy input/output processing.
from lerobot.processor import PolicyProcessorPipeline
from lerobot.policies.factory import make_pre_post_processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=config,
dataset_stats=dataset.meta.stats,
)
# Preprocessing pipeline
processed_obs = preprocessor(observation)
# Postprocessing pipeline
action = policy.select_action(processed_obs)
final_action = postprocessor(action)
RobotProcessorPipeline
Pipeline for robot-specific processing.
from lerobot.processor import RobotProcessorPipeline
robot_pipeline = RobotProcessorPipeline([
# Custom steps for robot
])
processed = robot_pipeline(robot_data)
Usage Examples
Training Pipeline
from lerobot.datasets import LeRobotDataset
from lerobot.policies import make_policy
from lerobot.policies.factory import make_pre_post_processors
from torch.utils.data import DataLoader
# Load dataset
dataset = LeRobotDataset("lerobot/pusht")
# Create policy
policy = make_policy(config, dataset.meta)
# Create preprocessor
preprocessor, _ = make_pre_post_processors(
policy_cfg=config,
dataset_stats=dataset.meta.stats,
)
# Setup dataloader
dataloader = DataLoader(dataset, batch_size=32)
# Training loop
for batch in dataloader:
# Preprocess batch
batch = preprocessor(batch)
# Forward pass
loss, info = policy.forward(batch)
loss.backward()
optimizer.step()
Inference Pipeline
from lerobot.robots import make_robot_from_config
from lerobot.policies import make_policy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import make_default_robot_action_processor
# Load components
robot = make_robot_from_config(robot_config)
policy = make_policy(cfg=None, pretrained_path="lerobot/diffusion_pusht")
# Create processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy.config,
pretrained_path="lerobot/diffusion_pusht",
)
robot_action_processor = make_default_robot_action_processor()
# Inference loop
robot.connect()
policy.reset()
for step in range(100):
# Get observation
observation = robot.get_observation()
# Policy inference
observation = preprocessor(observation)
action = policy.select_action(observation)
action = postprocessor(action)
# Process for robot
robot_action = robot_action_processor((action, observation))
# Send to robot
robot.send_action(robot_action)
robot.disconnect()
Custom Processor Step
from lerobot.processor import ProcessorStep
import torch
class CustomProcessorStep(ProcessorStep):
def __init__(self, scale: float):
self.scale = scale
def __call__(self, data: dict) -> dict:
# Scale all action values
if "action" in data:
data["action"] = data["action"] * self.scale
return data
# Use in pipeline
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline([
CustomProcessorStep(scale=2.0),
DeviceProcessorStep(device="cuda"),
])
processed = pipeline(data)
Delta Action Processing
from lerobot.processor import (
MapTensorToDeltaActionDictStep,
MapDeltaActionToRobotActionStep,
)
# Convert policy tensor output to delta action dict
tensor_to_delta = MapTensorToDeltaActionDictStep(
action_names=["x", "y", "z", "gripper"]
)
# Convert delta action to absolute action
delta_to_absolute = MapDeltaActionToRobotActionStep()
# Pipeline
policy_output = policy.select_action(observation) # Tensor
delta_action = tensor_to_delta(policy_output) # Dict
robot_obs = robot.get_observation()
absolute_action = delta_to_absolute((delta_action, robot_obs))
Observation Renaming
from lerobot.processor import RenameObservationsProcessorStep
# Robot has different camera names than policy expects
renamer = RenameObservationsProcessorStep(
rename_map={
"observation.images.webcam": "observation.images.top",
"observation.joint_pos": "observation.state",
}
)
robot_obs = robot.get_observation()
renamed_obs = renamer(robot_obs)
# Now compatible with policy trained on different names
Processor Registry
Register custom processor steps:
from lerobot.processor import ProcessorStepRegistry
registry = ProcessorStepRegistry()
@registry.register("my_custom_step")
class MyCustomStep(ProcessorStep):
def __call__(self, data):
return data
# Use in configuration
step = registry.create("my_custom_step", config={})
See Also