Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt

Use this file to discover all available pages before exploring further.

Environment processors transform observations from simulation environments (like LIBERO, Isaac Lab Arena) into the LeRobot format. They handle environment-specific conventions like camera orientations, state representations, and data formats.

Overview

Environment processors bridge the gap between:
  • Simulation environments: Each with their own observation format
  • LeRobot format: Standardized observation structure expected by policies
They typically handle:
  • Image preprocessing (rotation, format conversion)
  • State extraction and concatenation
  • Feature renaming and reorganization

LIBERO Environment

Process LIBERO environment observations:
from lerobot.processor import ProcessorStepRegistry

# Create LIBERO processor
libero_processor = ProcessorStepRegistry.get("libero_processor")()

# Process observation
obs = {
    "observation.images.camera": camera_image,  # (B, H, W, C)
    "observation.robot_state": {
        "eef": {
            "pos": eef_position,    # (B, 3)
            "quat": eef_quat,       # (B, 4) - quaternion
        },
        "gripper": {
            "qpos": gripper_pos,    # (B, 2)
        },
    },
}

transition = create_transition(observation=obs)
processed = libero_processor(transition)

print(processed["observation"])
# Output:
# {
#   "observation.images.camera": <rotated_image>,  # Rotated 180°
#   "observation.state": <flat_state>,             # (B, 8) = [pos(3), axisangle(3), gripper(2)]
# }
See lerobot/processor/env_processor.py:26 for the implementation.

What LIBERO Processor Does

  1. Rotates images 180 degrees to match HuggingFaceVLA camera convention:
img = torch.flip(img, dims=[2, 3])  # Flip H and W
  1. Flattens robot state from nested dict to single vector:
# Extract components
eef_pos = robot_state["eef"]["pos"]        # (B, 3)
eef_quat = robot_state["eef"]["quat"]      # (B, 4)
gripper_qpos = robot_state["gripper"]["qpos"]  # (B, 2)

# Convert quaternion to axis-angle
eef_axisangle = quat2axisangle(eef_quat)   # (B, 3)

# Concatenate
state = torch.cat([eef_pos, eef_axisangle, gripper_qpos], dim=-1)  # (B, 8)
  1. Updates feature shapes in transform_features():
def transform_features(self, features):
    # Replace nested robot_state with flat state
    features[FeatureType.STATE] = {
        "observation.state": PolicyFeature(
            type=FeatureType.STATE,
            shape=(8,)  # [eef_pos(3), axis_angle(3), gripper(2)]
        )
    }
    return features

Isaac Lab Arena Environment

Process Isaac Lab Arena observations:
from lerobot.processor import ProcessorStepRegistry

# Create Isaac Lab processor
isaaclab_processor = ProcessorStepRegistry.get("isaaclab_arena_processor")(
    state_keys=("robot_joint_pos", "left_eef_pos", "right_eef_pos"),
    camera_keys=("robot_pov_cam_rgb", "third_person_cam_rgb")
)

# Process observation
obs = {
    "observation.policy": {
        "robot_joint_pos": joint_positions,    # (B, 7)
        "left_eef_pos": left_eef,              # (B, 3)
        "right_eef_pos": right_eef,            # (B, 3)
        "object_pos": object_positions,        # (B, N, 3) - not used
    },
    "observation.camera_obs": {
        "robot_pov_cam_rgb": robot_cam,        # (B, H, W, C) uint8
        "third_person_cam_rgb": third_cam,     # (B, H, W, C) uint8
        "depth": depth_map,                    # (B, H, W) - not used
    },
}

transition = create_transition(observation=obs)
processed = isaaclab_processor(transition)

print(processed["observation"])
# Output:
# {
#   "observation.images.robot_pov_cam_rgb": <image>,      # (B, C, H, W) float32 [0,1]
#   "observation.images.third_person_cam_rgb": <image>,   # (B, C, H, W) float32 [0,1]
#   "observation.state": <flat_state>,                     # (B, 13) = concat of state_keys
# }
See lerobot/processor/env_processor.py:156 for the implementation.

What Isaac Lab Processor Does

  1. Converts images from (B, H, W, C) uint8 to (B, C, H, W) float32:
img = img.permute(0, 3, 1, 2).contiguous()  # BHWC -> BCHW
if img.dtype == torch.uint8:
    img = img.float() / 255.0  # [0, 255] -> [0.0, 1.0]
  1. Selectively extracts and concatenates state based on state_keys:
state_components = []
for key in self.state_keys:  # e.g., ["robot_joint_pos", "left_eef_pos", "right_eef_pos"]
    if key in policy_obs:
        component = policy_obs[key]
        # Flatten extra dims: (B, N, M) -> (B, N*M)
        if component.dim() > 2:
            batch_size = component.shape[0]
            component = component.view(batch_size, -1)
        state_components.append(component)

state = torch.cat(state_components, dim=-1)  # (B, total_state_dim)
  1. Renames features to LeRobot convention:
  • observation.camera_obs.<name>observation.images.<name>
  • observation.policyobservation.state

Configuration

state_keys
tuple[str, ...]
required
Keys to extract from obs["policy"] and concatenate into state vector. Order matters!
camera_keys
tuple[str, ...]
required
Camera names to extract from obs["camera_obs"]

Custom Environment Processor

Create a processor for your custom environment:
from lerobot.processor import ObservationProcessorStep, ProcessorStepRegistry
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
import torch

@ProcessorStepRegistry.register("my_env_processor")
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
    """Process observations from MyCustomEnv.
    
    MyCustomEnv provides:
    - obs["rgb"]: (H, W, 3) numpy array in range [0, 255]
    - obs["depth"]: (H, W) numpy array  
    - obs["proprio"]: dict with keys "joint_pos", "joint_vel", "force"
    
    We convert to LeRobot format:
    - observation.images.rgb: (B, 3, H, W) float32 [0, 1]
    - observation.state: (B, state_dim) concatenation of proprio
    """
    
    include_velocity: bool = True
    include_force: bool = False
    
    def observation(self, observation: dict) -> dict:
        processed = {}
        
        # Process RGB image
        if "rgb" in observation:
            rgb = observation["rgb"]
            
            # Convert numpy to tensor if needed
            if isinstance(rgb, np.ndarray):
                rgb = torch.from_numpy(rgb)
            
            # Add batch dim if needed
            if rgb.ndim == 3:
                rgb = rgb.unsqueeze(0)  # (H, W, C) -> (B, H, W, C)
            
            # Convert to (B, C, H, W) and normalize
            rgb = rgb.permute(0, 3, 1, 2).float() / 255.0
            
            processed[f"{OBS_IMAGES}.rgb"] = rgb
        
        # Process proprioception
        if "proprio" in observation:
            proprio = observation["proprio"]
            
            state_components = []
            
            # Always include joint position
            if "joint_pos" in proprio:
                state_components.append(torch.tensor(proprio["joint_pos"]).float())
            
            # Optionally include velocity
            if self.include_velocity and "joint_vel" in proprio:
                state_components.append(torch.tensor(proprio["joint_vel"]).float())
            
            # Optionally include force
            if self.include_force and "force" in proprio:
                state_components.append(torch.tensor(proprio["force"]).float())
            
            # Concatenate and add batch dim if needed
            state = torch.cat(state_components, dim=-1)
            if state.ndim == 1:
                state = state.unsqueeze(0)  # (N,) -> (B, N)
            
            processed[OBS_STATE] = state
        
        return processed
    
    def transform_features(self, features):
        """Update feature descriptions."""
        new_features = {}
        
        # Add image feature
        new_features[FeatureType.IMAGE] = {
            f"{OBS_IMAGES}.rgb": PolicyFeature(
                type=FeatureType.IMAGE,
                shape=(3, 224, 224)  # Assuming 224x224 images
            )
        }
        
        # Compute state dimension
        state_dim = 7  # joint_pos
        if self.include_velocity:
            state_dim += 7  # joint_vel
        if self.include_force:
            state_dim += 6  # force
        
        new_features[FeatureType.STATE] = {
            OBS_STATE: PolicyFeature(
                type=FeatureType.STATE,
                shape=(state_dim,)
            )
        }
        
        return new_features

# Usage
processor = MyEnvProcessorStep(
    include_velocity=True,
    include_force=False
)

Handling Multi-Dimensional States

Some environments provide multi-dimensional state arrays that need flattening:
@ProcessorStepRegistry.register("flatten_state_processor")
@dataclass
class FlattenStateProcessorStep(ObservationProcessorStep):
    """Flatten multi-dimensional state observations."""
    
    state_keys: tuple[str, ...]
    
    def observation(self, observation: dict) -> dict:
        processed = observation.copy()
        
        state_components = []
        for key in self.state_keys:
            if key in observation:
                component = observation[key]
                
                # Flatten all dims except batch
                if component.dim() > 2:
                    batch_size = component.shape[0]
                    component = component.view(batch_size, -1)
                elif component.dim() == 1:
                    component = component.unsqueeze(0)
                
                state_components.append(component)
        
        if state_components:
            processed[OBS_STATE] = torch.cat(state_components, dim=-1)
        
        return processed
    
    def transform_features(self, features):
        # Calculate total flattened dimension
        total_dim = 0
        for key in self.state_keys:
            if key in features.get(FeatureType.STATE, {}):
                feature = features[FeatureType.STATE][key]
                total_dim += np.prod(feature.shape)
        
        return {
            FeatureType.STATE: {
                OBS_STATE: PolicyFeature(
                    type=FeatureType.STATE,
                    shape=(total_dim,)
                )
            }
        }

Testing Environment Processors

Always test with realistic environment data:
import pytest
import torch
import numpy as np
from lerobot.processor import create_transition

def test_my_env_processor():
    # Create realistic test data
    observation = {
        "rgb": np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8),
        "proprio": {
            "joint_pos": np.random.randn(7).astype(np.float32),
            "joint_vel": np.random.randn(7).astype(np.float32),
            "force": np.random.randn(6).astype(np.float32),
        },
    }
    
    # Create processor
    processor = MyEnvProcessorStep(
        include_velocity=True,
        include_force=True
    )
    
    # Process
    transition = create_transition(observation=observation)
    processed = processor(transition)
    
    # Verify outputs
    assert "observation.images.rgb" in processed["observation"]
    assert "observation.state" in processed["observation"]
    
    # Check shapes
    rgb = processed["observation"]["observation.images.rgb"]
    assert rgb.shape == (1, 3, 224, 224)  # (B, C, H, W)
    assert rgb.dtype == torch.float32
    assert rgb.min() >= 0.0 and rgb.max() <= 1.0
    
    state = processed["observation"]["observation.state"]
    assert state.shape == (1, 20)  # 7 + 7 + 6
    assert state.dtype == torch.float32
    
    # Verify feature transformation
    input_features = {}
    output_features = processor.transform_features(input_features)
    
    assert FeatureType.IMAGE in output_features
    assert FeatureType.STATE in output_features
    assert output_features[FeatureType.STATE]["observation.state"].shape == (20,)

Integration with Environments

Use environment processors in your gym environment:
import gymnasium as gym
from lerobot.processor import DataProcessorPipeline

class MyGymEnv(gym.Env):
    def __init__(self):
        # Create observation processor
        self.obs_processor = DataProcessorPipeline(
            steps=[
                MyEnvProcessorStep(include_velocity=True),
                # Add more processors if needed
            ],
            name="obs_pipeline"
        )
    
    def reset(self):
        # Get raw observation from environment
        raw_obs = self._get_raw_observation()
        
        # Process to LeRobot format
        transition = create_transition(observation=raw_obs)
        processed = self.obs_processor(transition)
        
        return processed["observation"]
    
    def step(self, action):
        # Execute action
        raw_obs = self._execute_action(action)
        
        # Process observation
        transition = create_transition(observation=raw_obs)
        processed = self.obs_processor(transition)
        
        return processed["observation"], reward, done, truncated, info

Best Practices

1. Match Environment Conventions

Understand your environment’s data format:
# Document environment output format
"""
MyEnv observation format:
- Images: (H, W, C) uint8 [0, 255], BGR color space
- State: dict with keys ["pos", "vel", "acc"]
- Coordinate system: X-forward, Y-left, Z-up
"""

# Handle in processor
def observation(self, obs):
    # Convert BGR to RGB
    img = obs["rgb"][:, :, ::-1]  # BGR -> RGB
    
    # Transform coordinates if needed
    # ...

2. Preserve Batch Dimension

Always maintain batch dimension:
def observation(self, obs):
    # Add batch dim if missing
    if tensor.ndim == expected_ndim - 1:
        tensor = tensor.unsqueeze(0)
    
    # Process with batch dim
    # ...
    
    return processed

3. Document State Ordering

Clearly document concatenation order:
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
    """
    State vector composition (total: 20):
    - [0:7]:   joint positions
    - [7:14]:  joint velocities  
    - [14:17]: end-effector position
    - [17:20]: end-effector orientation (axis-angle)
    """

API Reference

LiberoProcessorStep

See lerobot/processor/env_processor.py:26
observation
(observation) -> dict
Process LIBERO observation to LeRobot format

IsaaclabArenaProcessorStep

See lerobot/processor/env_processor.py:156
state_keys
tuple[str, ...]
Keys to extract from policy observation
camera_keys
tuple[str, ...]
Camera names to extract from camera observation
observation
(observation) -> dict
Process Isaac Lab observation to LeRobot format

Build docs developers (and LLMs) love