Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt

Use this file to discover all available pages before exploring further.

The Processor API provides modular pipelines for transforming data between robots, policies, and environments.

Overview

LeRobot uses processor pipelines to:
  • Normalize/unnormalize data
  • Convert between data formats (numpy ↔ torch)
  • Apply transformations (e.g., delta actions)
  • Move data between devices (CPU/GPU)
  • Rename observation keys
  • Add batch dimensions

Type Definitions

from lerobot.processor import (
    RobotObservation,  # Dict of robot sensor data
    RobotAction,       # Dict of robot commands
    PolicyAction,      # Tensor or dict from policy
    EnvAction,         # Numpy array for environment
)
Location: src/lerobot/processor/core.py

Core Pipeline Classes

ProcessorStep

Base class for all processing steps.
from lerobot.processor import ProcessorStep

class ProcessorStep:
    def __call__(self, data: Any) -> Any:
        """Transform input data."""
        pass

DataProcessorPipeline

Generic pipeline for chaining processing steps.
from lerobot.processor import DataProcessorPipeline

pipeline = DataProcessorPipeline([
    NormalizerProcessorStep(stats),
    DeviceProcessorStep(device="cuda"),
])

processed_data = pipeline(raw_data)

Built-in Processor Steps

NormalizerProcessorStep

Normalize data using dataset statistics.
from lerobot.processor import NormalizerProcessorStep

normalizer = NormalizerProcessorStep(
    stats=dataset.meta.stats,
    features={
        "observation.state": {...},
        "action": {...},
    },
)

normalized = normalizer(data)
stats
dict
required
Dataset statistics with mean and std for each feature.
features
dict
required
Feature definitions specifying which keys to normalize.

UnnormalizerProcessorStep

Reverse normalization.
from lerobot.processor import UnnormalizerProcessorStep

unnormalizer = UnnormalizerProcessorStep(
    stats=dataset.meta.stats,
    features={"action": {...}},
)

action = unnormalizer(normalized_action)

DeviceProcessorStep

Move tensors to specific device.
from lerobot.processor import DeviceProcessorStep

to_cuda = DeviceProcessorStep(device="cuda")
data_on_gpu = to_cuda(data_on_cpu)
device
str
required
Device string: "cpu", "cuda", "cuda:0", etc.

MapDeltaActionToRobotActionStep

Convert delta actions to absolute actions.
from lerobot.processor import MapDeltaActionToRobotActionStep

delta_to_absolute = MapDeltaActionToRobotActionStep()

# Input: (delta_action, current_observation)
absolute_action = delta_to_absolute((delta_action, robot_obs))

RenameObservationsProcessorStep

Rename observation keys.
from lerobot.processor import RenameObservationsProcessorStep

renamer = RenameObservationsProcessorStep(
    rename_map={
        "observation.images.camera1": "observation.images.front",
    }
)

renamed_obs = renamer(observation)
rename_map
dict[str, str]
required
Mapping from old keys to new keys.

AddBatchDimensionProcessorStep

Add batch dimension to tensors.
from lerobot.processor import AddBatchDimensionProcessorStep

add_batch = AddBatchDimensionProcessorStep()

# (C, H, W) -> (1, C, H, W)
batched = add_batch(unbatched_data)

Factory Functions

make_default_processors

from lerobot.processor import make_default_processors

robot_obs_processor, robot_action_processor, teleop_action_processor = make_default_processors()
Create default processors for robot data.
robot_obs_processor
RobotProcessorPipeline
Pipeline for processing robot observations.
robot_action_processor
RobotProcessorPipeline
Pipeline for processing robot actions.
teleop_action_processor
RobotProcessorPipeline
Pipeline for processing teleoperation actions.

make_default_robot_observation_processor

from lerobot.processor import make_default_robot_observation_processor

obs_processor = make_default_robot_observation_processor()
processed_obs = obs_processor(robot.get_observation())

make_default_robot_action_processor

from lerobot.processor import make_default_robot_action_processor

action_processor = make_default_robot_action_processor()
processed_action = action_processor((action, observation))

Specialized Pipelines

PolicyProcessorPipeline

Pipeline for policy input/output processing.
from lerobot.processor import PolicyProcessorPipeline
from lerobot.policies.factory import make_pre_post_processors

preprocessor, postprocessor = make_pre_post_processors(
    policy_cfg=config,
    dataset_stats=dataset.meta.stats,
)

# Preprocessing pipeline
processed_obs = preprocessor(observation)

# Postprocessing pipeline
action = policy.select_action(processed_obs)
final_action = postprocessor(action)

RobotProcessorPipeline

Pipeline for robot-specific processing.
from lerobot.processor import RobotProcessorPipeline

robot_pipeline = RobotProcessorPipeline([
    # Custom steps for robot
])

processed = robot_pipeline(robot_data)

Usage Examples

Training Pipeline

from lerobot.datasets import LeRobotDataset
from lerobot.policies import make_policy
from lerobot.policies.factory import make_pre_post_processors
from torch.utils.data import DataLoader

# Load dataset
dataset = LeRobotDataset("lerobot/pusht")

# Create policy
policy = make_policy(config, dataset.meta)

# Create preprocessor
preprocessor, _ = make_pre_post_processors(
    policy_cfg=config,
    dataset_stats=dataset.meta.stats,
)

# Setup dataloader
dataloader = DataLoader(dataset, batch_size=32)

# Training loop
for batch in dataloader:
    # Preprocess batch
    batch = preprocessor(batch)
    
    # Forward pass
    loss, info = policy.forward(batch)
    loss.backward()
    optimizer.step()

Inference Pipeline

from lerobot.robots import make_robot_from_config
from lerobot.policies import make_policy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import make_default_robot_action_processor

# Load components
robot = make_robot_from_config(robot_config)
policy = make_policy(cfg=None, pretrained_path="lerobot/diffusion_pusht")

# Create processors
preprocessor, postprocessor = make_pre_post_processors(
    policy_cfg=policy.config,
    pretrained_path="lerobot/diffusion_pusht",
)

robot_action_processor = make_default_robot_action_processor()

# Inference loop
robot.connect()
policy.reset()

for step in range(100):
    # Get observation
    observation = robot.get_observation()
    
    # Policy inference
    observation = preprocessor(observation)
    action = policy.select_action(observation)
    action = postprocessor(action)
    
    # Process for robot
    robot_action = robot_action_processor((action, observation))
    
    # Send to robot
    robot.send_action(robot_action)

robot.disconnect()

Custom Processor Step

from lerobot.processor import ProcessorStep
import torch

class CustomProcessorStep(ProcessorStep):
    def __init__(self, scale: float):
        self.scale = scale
    
    def __call__(self, data: dict) -> dict:
        # Scale all action values
        if "action" in data:
            data["action"] = data["action"] * self.scale
        return data

# Use in pipeline
from lerobot.processor import DataProcessorPipeline

pipeline = DataProcessorPipeline([
    CustomProcessorStep(scale=2.0),
    DeviceProcessorStep(device="cuda"),
])

processed = pipeline(data)

Delta Action Processing

from lerobot.processor import (
    MapTensorToDeltaActionDictStep,
    MapDeltaActionToRobotActionStep,
)

# Convert policy tensor output to delta action dict
tensor_to_delta = MapTensorToDeltaActionDictStep(
    action_names=["x", "y", "z", "gripper"]
)

# Convert delta action to absolute action
delta_to_absolute = MapDeltaActionToRobotActionStep()

# Pipeline
policy_output = policy.select_action(observation)  # Tensor
delta_action = tensor_to_delta(policy_output)  # Dict
robot_obs = robot.get_observation()
absolute_action = delta_to_absolute((delta_action, robot_obs))

Observation Renaming

from lerobot.processor import RenameObservationsProcessorStep

# Robot has different camera names than policy expects
renamer = RenameObservationsProcessorStep(
    rename_map={
        "observation.images.webcam": "observation.images.top",
        "observation.joint_pos": "observation.state",
    }
)

robot_obs = robot.get_observation()
renamed_obs = renamer(robot_obs)
# Now compatible with policy trained on different names

Processor Registry

Register custom processor steps:
from lerobot.processor import ProcessorStepRegistry

registry = ProcessorStepRegistry()

@registry.register("my_custom_step")
class MyCustomStep(ProcessorStep):
    def __call__(self, data):
        return data

# Use in configuration
step = registry.create("my_custom_step", config={})

See Also

Build docs developers (and LLMs) love