Documentation Index
Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt
Use this file to discover all available pages before exploring further.
Environment processors transform observations from simulation environments (like LIBERO, Isaac Lab Arena) into the LeRobot format. They handle environment-specific conventions like camera orientations, state representations, and data formats.
Overview
Environment processors bridge the gap between:
- Simulation environments: Each with their own observation format
- LeRobot format: Standardized observation structure expected by policies
They typically handle:
- Image preprocessing (rotation, format conversion)
- State extraction and concatenation
- Feature renaming and reorganization
LIBERO Environment
Process LIBERO environment observations:
from lerobot.processor import ProcessorStepRegistry
# Create LIBERO processor
libero_processor = ProcessorStepRegistry.get("libero_processor")()
# Process observation
obs = {
"observation.images.camera": camera_image, # (B, H, W, C)
"observation.robot_state": {
"eef": {
"pos": eef_position, # (B, 3)
"quat": eef_quat, # (B, 4) - quaternion
},
"gripper": {
"qpos": gripper_pos, # (B, 2)
},
},
}
transition = create_transition(observation=obs)
processed = libero_processor(transition)
print(processed["observation"])
# Output:
# {
# "observation.images.camera": <rotated_image>, # Rotated 180°
# "observation.state": <flat_state>, # (B, 8) = [pos(3), axisangle(3), gripper(2)]
# }
See lerobot/processor/env_processor.py:26 for the implementation.
What LIBERO Processor Does
- Rotates images 180 degrees to match HuggingFaceVLA camera convention:
img = torch.flip(img, dims=[2, 3]) # Flip H and W
- Flattens robot state from nested dict to single vector:
# Extract components
eef_pos = robot_state["eef"]["pos"] # (B, 3)
eef_quat = robot_state["eef"]["quat"] # (B, 4)
gripper_qpos = robot_state["gripper"]["qpos"] # (B, 2)
# Convert quaternion to axis-angle
eef_axisangle = quat2axisangle(eef_quat) # (B, 3)
# Concatenate
state = torch.cat([eef_pos, eef_axisangle, gripper_qpos], dim=-1) # (B, 8)
- Updates feature shapes in
transform_features():
def transform_features(self, features):
# Replace nested robot_state with flat state
features[FeatureType.STATE] = {
"observation.state": PolicyFeature(
type=FeatureType.STATE,
shape=(8,) # [eef_pos(3), axis_angle(3), gripper(2)]
)
}
return features
Isaac Lab Arena Environment
Process Isaac Lab Arena observations:
from lerobot.processor import ProcessorStepRegistry
# Create Isaac Lab processor
isaaclab_processor = ProcessorStepRegistry.get("isaaclab_arena_processor")(
state_keys=("robot_joint_pos", "left_eef_pos", "right_eef_pos"),
camera_keys=("robot_pov_cam_rgb", "third_person_cam_rgb")
)
# Process observation
obs = {
"observation.policy": {
"robot_joint_pos": joint_positions, # (B, 7)
"left_eef_pos": left_eef, # (B, 3)
"right_eef_pos": right_eef, # (B, 3)
"object_pos": object_positions, # (B, N, 3) - not used
},
"observation.camera_obs": {
"robot_pov_cam_rgb": robot_cam, # (B, H, W, C) uint8
"third_person_cam_rgb": third_cam, # (B, H, W, C) uint8
"depth": depth_map, # (B, H, W) - not used
},
}
transition = create_transition(observation=obs)
processed = isaaclab_processor(transition)
print(processed["observation"])
# Output:
# {
# "observation.images.robot_pov_cam_rgb": <image>, # (B, C, H, W) float32 [0,1]
# "observation.images.third_person_cam_rgb": <image>, # (B, C, H, W) float32 [0,1]
# "observation.state": <flat_state>, # (B, 13) = concat of state_keys
# }
See lerobot/processor/env_processor.py:156 for the implementation.
What Isaac Lab Processor Does
- Converts images from (B, H, W, C) uint8 to (B, C, H, W) float32:
img = img.permute(0, 3, 1, 2).contiguous() # BHWC -> BCHW
if img.dtype == torch.uint8:
img = img.float() / 255.0 # [0, 255] -> [0.0, 1.0]
- Selectively extracts and concatenates state based on
state_keys:
state_components = []
for key in self.state_keys: # e.g., ["robot_joint_pos", "left_eef_pos", "right_eef_pos"]
if key in policy_obs:
component = policy_obs[key]
# Flatten extra dims: (B, N, M) -> (B, N*M)
if component.dim() > 2:
batch_size = component.shape[0]
component = component.view(batch_size, -1)
state_components.append(component)
state = torch.cat(state_components, dim=-1) # (B, total_state_dim)
- Renames features to LeRobot convention:
observation.camera_obs.<name> → observation.images.<name>
observation.policy → observation.state
Configuration
Keys to extract from obs["policy"] and concatenate into state vector. Order matters!
Camera names to extract from obs["camera_obs"]
Custom Environment Processor
Create a processor for your custom environment:
from lerobot.processor import ObservationProcessorStep, ProcessorStepRegistry
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
import torch
@ProcessorStepRegistry.register("my_env_processor")
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
"""Process observations from MyCustomEnv.
MyCustomEnv provides:
- obs["rgb"]: (H, W, 3) numpy array in range [0, 255]
- obs["depth"]: (H, W) numpy array
- obs["proprio"]: dict with keys "joint_pos", "joint_vel", "force"
We convert to LeRobot format:
- observation.images.rgb: (B, 3, H, W) float32 [0, 1]
- observation.state: (B, state_dim) concatenation of proprio
"""
include_velocity: bool = True
include_force: bool = False
def observation(self, observation: dict) -> dict:
processed = {}
# Process RGB image
if "rgb" in observation:
rgb = observation["rgb"]
# Convert numpy to tensor if needed
if isinstance(rgb, np.ndarray):
rgb = torch.from_numpy(rgb)
# Add batch dim if needed
if rgb.ndim == 3:
rgb = rgb.unsqueeze(0) # (H, W, C) -> (B, H, W, C)
# Convert to (B, C, H, W) and normalize
rgb = rgb.permute(0, 3, 1, 2).float() / 255.0
processed[f"{OBS_IMAGES}.rgb"] = rgb
# Process proprioception
if "proprio" in observation:
proprio = observation["proprio"]
state_components = []
# Always include joint position
if "joint_pos" in proprio:
state_components.append(torch.tensor(proprio["joint_pos"]).float())
# Optionally include velocity
if self.include_velocity and "joint_vel" in proprio:
state_components.append(torch.tensor(proprio["joint_vel"]).float())
# Optionally include force
if self.include_force and "force" in proprio:
state_components.append(torch.tensor(proprio["force"]).float())
# Concatenate and add batch dim if needed
state = torch.cat(state_components, dim=-1)
if state.ndim == 1:
state = state.unsqueeze(0) # (N,) -> (B, N)
processed[OBS_STATE] = state
return processed
def transform_features(self, features):
"""Update feature descriptions."""
new_features = {}
# Add image feature
new_features[FeatureType.IMAGE] = {
f"{OBS_IMAGES}.rgb": PolicyFeature(
type=FeatureType.IMAGE,
shape=(3, 224, 224) # Assuming 224x224 images
)
}
# Compute state dimension
state_dim = 7 # joint_pos
if self.include_velocity:
state_dim += 7 # joint_vel
if self.include_force:
state_dim += 6 # force
new_features[FeatureType.STATE] = {
OBS_STATE: PolicyFeature(
type=FeatureType.STATE,
shape=(state_dim,)
)
}
return new_features
# Usage
processor = MyEnvProcessorStep(
include_velocity=True,
include_force=False
)
Handling Multi-Dimensional States
Some environments provide multi-dimensional state arrays that need flattening:
@ProcessorStepRegistry.register("flatten_state_processor")
@dataclass
class FlattenStateProcessorStep(ObservationProcessorStep):
"""Flatten multi-dimensional state observations."""
state_keys: tuple[str, ...]
def observation(self, observation: dict) -> dict:
processed = observation.copy()
state_components = []
for key in self.state_keys:
if key in observation:
component = observation[key]
# Flatten all dims except batch
if component.dim() > 2:
batch_size = component.shape[0]
component = component.view(batch_size, -1)
elif component.dim() == 1:
component = component.unsqueeze(0)
state_components.append(component)
if state_components:
processed[OBS_STATE] = torch.cat(state_components, dim=-1)
return processed
def transform_features(self, features):
# Calculate total flattened dimension
total_dim = 0
for key in self.state_keys:
if key in features.get(FeatureType.STATE, {}):
feature = features[FeatureType.STATE][key]
total_dim += np.prod(feature.shape)
return {
FeatureType.STATE: {
OBS_STATE: PolicyFeature(
type=FeatureType.STATE,
shape=(total_dim,)
)
}
}
Testing Environment Processors
Always test with realistic environment data:
import pytest
import torch
import numpy as np
from lerobot.processor import create_transition
def test_my_env_processor():
# Create realistic test data
observation = {
"rgb": np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8),
"proprio": {
"joint_pos": np.random.randn(7).astype(np.float32),
"joint_vel": np.random.randn(7).astype(np.float32),
"force": np.random.randn(6).astype(np.float32),
},
}
# Create processor
processor = MyEnvProcessorStep(
include_velocity=True,
include_force=True
)
# Process
transition = create_transition(observation=observation)
processed = processor(transition)
# Verify outputs
assert "observation.images.rgb" in processed["observation"]
assert "observation.state" in processed["observation"]
# Check shapes
rgb = processed["observation"]["observation.images.rgb"]
assert rgb.shape == (1, 3, 224, 224) # (B, C, H, W)
assert rgb.dtype == torch.float32
assert rgb.min() >= 0.0 and rgb.max() <= 1.0
state = processed["observation"]["observation.state"]
assert state.shape == (1, 20) # 7 + 7 + 6
assert state.dtype == torch.float32
# Verify feature transformation
input_features = {}
output_features = processor.transform_features(input_features)
assert FeatureType.IMAGE in output_features
assert FeatureType.STATE in output_features
assert output_features[FeatureType.STATE]["observation.state"].shape == (20,)
Integration with Environments
Use environment processors in your gym environment:
import gymnasium as gym
from lerobot.processor import DataProcessorPipeline
class MyGymEnv(gym.Env):
def __init__(self):
# Create observation processor
self.obs_processor = DataProcessorPipeline(
steps=[
MyEnvProcessorStep(include_velocity=True),
# Add more processors if needed
],
name="obs_pipeline"
)
def reset(self):
# Get raw observation from environment
raw_obs = self._get_raw_observation()
# Process to LeRobot format
transition = create_transition(observation=raw_obs)
processed = self.obs_processor(transition)
return processed["observation"]
def step(self, action):
# Execute action
raw_obs = self._execute_action(action)
# Process observation
transition = create_transition(observation=raw_obs)
processed = self.obs_processor(transition)
return processed["observation"], reward, done, truncated, info
Best Practices
1. Match Environment Conventions
Understand your environment’s data format:
# Document environment output format
"""
MyEnv observation format:
- Images: (H, W, C) uint8 [0, 255], BGR color space
- State: dict with keys ["pos", "vel", "acc"]
- Coordinate system: X-forward, Y-left, Z-up
"""
# Handle in processor
def observation(self, obs):
# Convert BGR to RGB
img = obs["rgb"][:, :, ::-1] # BGR -> RGB
# Transform coordinates if needed
# ...
2. Preserve Batch Dimension
Always maintain batch dimension:
def observation(self, obs):
# Add batch dim if missing
if tensor.ndim == expected_ndim - 1:
tensor = tensor.unsqueeze(0)
# Process with batch dim
# ...
return processed
3. Document State Ordering
Clearly document concatenation order:
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
"""
State vector composition (total: 20):
- [0:7]: joint positions
- [7:14]: joint velocities
- [14:17]: end-effector position
- [17:20]: end-effector orientation (axis-angle)
"""
API Reference
LiberoProcessorStep
See lerobot/processor/env_processor.py:26
Process LIBERO observation to LeRobot format
IsaaclabArenaProcessorStep
See lerobot/processor/env_processor.py:156
Keys to extract from policy observation
Camera names to extract from camera observation
Process Isaac Lab observation to LeRobot format