Documentation Index
Fetch the complete documentation index at: https://mintlify.com/huggingface/lerobot/llms.txt
Use this file to discover all available pages before exploring further.
LeRobot provides a unified Robot interface that makes it easy to integrate custom hardware. Once integrated, you can use all of LeRobot’s tools for data collection, training, and deployment.
Robot Interface
All LeRobot robots inherit from the Robot base class and implement a standard interface:
from lerobot.robots.robot import Robot
from lerobot.robots.config import RobotConfig
from lerobot.processor import RobotObservation, RobotAction
from dataclasses import dataclass
@dataclass
class MyRobotConfig(RobotConfig):
"""Configuration for MyRobot."""
port: str = "/dev/ttyUSB0"
baud_rate: int = 115200
# Add your robot-specific parameters
class MyRobot(Robot):
"""Custom robot implementation."""
# Required class attributes
name = "my_robot"
config_class = MyRobotConfig
def __init__(self, config: MyRobotConfig):
super().__init__(config)
self.port = config.port
self.baud_rate = config.baud_rate
self._is_connected = False
@property
def observation_features(self) -> dict:
"""Define observation space structure."""
return {
"state": (7,), # 7D joint positions
"pixels": {
"side": (480, 640, 3), # Camera dimensions
"wrist": (240, 320, 3),
}
}
@property
def action_features(self) -> dict:
"""Define action space structure."""
return {
"action": (7,), # 7D joint velocities
}
@property
def is_connected(self) -> bool:
"""Check if robot is connected."""
return self._is_connected
def connect(self) -> None:
"""Connect to the robot."""
# Initialize hardware connection
print(f"Connecting to robot on {self.port}...")
# Your connection code here
self._is_connected = True
print("Robot connected!")
def disconnect(self) -> None:
"""Disconnect from the robot."""
if self._is_connected:
print("Disconnecting robot...")
# Your disconnection code here
self._is_connected = False
def get_observation(self) -> RobotObservation:
"""Get current observation from robot."""
if not self.is_connected:
raise RuntimeError("Robot not connected")
# Read sensor data
joint_positions = self._read_joint_positions()
camera_images = self._capture_images()
return RobotObservation(
state=joint_positions,
pixels=camera_images
)
def send_action(self, action: RobotAction) -> None:
"""Send action command to robot."""
if not self.is_connected:
raise RuntimeError("Robot not connected")
# Extract action values
joint_velocities = action.action
# Send commands to hardware
self._write_joint_velocities(joint_velocities)
# Helper methods (implement based on your hardware)
def _read_joint_positions(self):
# Read from your hardware
pass
def _capture_images(self):
# Capture from cameras
pass
def _write_joint_velocities(self, velocities):
# Write to your hardware
pass
Complete Example: Serial Robot
Here’s a complete example for a robot with serial communication:
import numpy as np
import serial
import cv2
from lerobot.robots.robot import Robot
from lerobot.robots.config import RobotConfig
from lerobot.processor import RobotObservation, RobotAction
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from dataclasses import dataclass
@dataclass
class SerialRobotConfig(RobotConfig):
"""Configuration for serial-controlled robot."""
port: str = "/dev/ttyUSB0"
baud_rate: int = 115200
timeout: float = 1.0
num_joints: int = 6
cameras: dict[str, OpenCVCameraConfig] | None = None
class SerialRobot(Robot):
"""Robot controlled via serial communication."""
name = "serial_robot"
config_class = SerialRobotConfig
def __init__(self, config: SerialRobotConfig):
super().__init__(config)
self.port = config.port
self.baud_rate = config.baud_rate
self.timeout = config.timeout
self.num_joints = config.num_joints
self.serial_connection = None
self.cameras = {}
# Initialize cameras if configured
if config.cameras:
for name, cam_config in config.cameras.items():
self.cameras[name] = OpenCVCamera(cam_config)
@property
def observation_features(self) -> dict:
features = {
"state": (self.num_joints,),
}
# Add camera features
if self.cameras:
features["pixels"] = {}
for name, camera in self.cameras.items():
features["pixels"][name] = (
camera.config.height,
camera.config.width,
3 # RGB
)
return features
@property
def action_features(self) -> dict:
return {
"action": (self.num_joints,),
}
@property
def is_connected(self) -> bool:
return self.serial_connection is not None and self.serial_connection.is_open
def connect(self) -> None:
"""Connect to robot via serial port."""
print(f"Connecting to {self.port} at {self.baud_rate} baud...")
try:
self.serial_connection = serial.Serial(
port=self.port,
baudrate=self.baud_rate,
timeout=self.timeout
)
print("Serial connection established.")
# Connect cameras
for name, camera in self.cameras.items():
camera.connect()
print(f"Camera '{name}' connected.")
# Wait for robot to initialize
time.sleep(2)
# Send initialization command
self._send_command("INIT")
except serial.SerialException as e:
raise RuntimeError(f"Failed to connect: {e}")
def disconnect(self) -> None:
"""Disconnect from robot."""
if self.serial_connection:
self._send_command("STOP")
self.serial_connection.close()
print("Serial connection closed.")
# Disconnect cameras
for camera in self.cameras.values():
camera.disconnect()
def get_observation(self) -> RobotObservation:
"""Read current state and images."""
if not self.is_connected:
raise RuntimeError("Robot not connected")
# Read joint positions
self._send_command("GET_STATE")
response = self._read_response()
joint_positions = np.array([float(x) for x in response.split(",")])
# Capture images from all cameras
images = {}
for name, camera in self.cameras.items():
frame = camera.read()
images[name] = frame
return RobotObservation(
state=joint_positions,
pixels=images if images else None
)
def send_action(self, action: RobotAction) -> None:
"""Send action command to robot."""
if not self.is_connected:
raise RuntimeError("Robot not connected")
# Format action as comma-separated values
action_str = ",".join([f"{x:.6f}" for x in action.action])
command = f"SET_ACTION,{action_str}"
self._send_command(command)
def _send_command(self, command: str) -> None:
"""Send command over serial."""
self.serial_connection.write(f"{command}\n".encode())
self.serial_connection.flush()
def _read_response(self) -> str:
"""Read response from serial."""
response = self.serial_connection.readline().decode().strip()
return response
Using Your Robot
Data Collection
Once implemented, use your robot for teleoperation and data collection:
lerobot-record \
--robot.type=serial_robot \
--robot.port=/dev/ttyUSB0 \
--robot.baud_rate=115200 \
--teleop.type=keyboard \
--repo-id=your_username/serial_robot_dataset \
--num-episodes=50
Training
Train policies on data collected from your robot:
lerobot-train \
--policy.type=diffusion \
--dataset.repo_id=your_username/serial_robot_dataset \
--steps=50000
Deployment
Deploy trained policies on your robot:
from lerobot.robots.serial_robot import SerialRobot, SerialRobotConfig
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.cameras.opencv import OpenCVCameraConfig
import torch
# Configure robot
camera_config = {
"side": OpenCVCameraConfig(index_or_path=0, width=640, height=480),
}
robot_cfg = SerialRobotConfig(
port="/dev/ttyUSB0",
cameras=camera_config
)
# Load trained policy
policy = DiffusionPolicy.from_pretrained("your_username/serial_robot_policy")
policy.eval()
policy.to("cuda")
# Connect to robot
robot = SerialRobot(robot_cfg)
robot.connect()
try:
for step in range(100):
# Get observation
obs = robot.get_observation()
# Get action from policy
action = policy.select_action(obs)
# Execute action
robot.send_action(action)
finally:
robot.disconnect()
Camera Integration
LeRobot supports multiple camera backends:
OpenCV Cameras
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
camera_config = OpenCVCameraConfig(
index_or_path=0, # Camera index or device path
width=640,
height=480,
fps=30
)
camera = OpenCVCamera(camera_config)
camera.connect()
# Read frame
frame = camera.read() # Returns numpy array [H, W, 3]
camera.disconnect()
Multiple Cameras
cameras = {
"side": OpenCVCamera(OpenCVCameraConfig(index_or_path=0)),
"wrist": OpenCVCamera(OpenCVCameraConfig(index_or_path=1)),
"top": OpenCVCamera(OpenCVCameraConfig(index_or_path=2)),
}
# Connect all
for camera in cameras.values():
camera.connect()
# Read all
images = {name: cam.read() for name, cam in cameras.items()}
Teleoperation Devices
Implement custom teleoperation devices:
from lerobot.teleoperators.config import TeleoperatorConfig
from lerobot.teleoperators.base import Teleoperator
from dataclasses import dataclass
@dataclass
class MyTeleopConfig(TeleoperatorConfig):
device_id: str = "/dev/input/js0"
sensitivity: float = 1.0
class MyTeleoperator(Teleoperator):
"""Custom teleoperation device."""
name = "my_teleop"
config_class = MyTeleopConfig
def __init__(self, config: MyTeleopConfig):
super().__init__(config)
self.device_id = config.device_id
self.sensitivity = config.sensitivity
def connect(self) -> None:
# Connect to device
pass
def disconnect(self) -> None:
# Disconnect from device
pass
def get_action(self) -> RobotAction:
"""Read action from teleoperation device."""
# Read from device and convert to action
pass
Motor Calibration
LeRobot provides tools for motor calibration:
# Run calibration
lerobot-calibrate \
--robot.type=serial_robot \
--robot.port=/dev/ttyUSB0 \
--robot.id=my_robot_001
Calibration files are saved to ~/.cache/lerobot/calibration/robots/serial_robot/my_robot_001.json
Best Practices
Define clear observation and action spaces
Specify the exact structure of observations and actions:
@property
def observation_features(self) -> dict:
return {
"state": (7,), # Joint positions
"pixels": {
"side": (480, 640, 3),
}
}
Implement proper connection handling
Use context managers for safe resource management:
with robot:
# Robot automatically connects
obs = robot.get_observation()
robot.send_action(action)
# Robot automatically disconnects
Handle hardware failures gracefully:
def get_observation(self) -> RobotObservation:
try:
state = self._read_state()
except Exception as e:
logging.error(f"Failed to read state: {e}")
# Return safe default or raise
raise
Test each component independently:
# Test connection
robot.connect()
assert robot.is_connected
# Test observation
obs = robot.get_observation()
assert obs.state.shape == (7,)
# Test action
action = RobotAction(action=np.zeros(7))
robot.send_action(action)
Supported Hardware
LeRobot includes implementations for:
- SO-100: LeRobot’s reference robot arm
- Koch: Low-cost robot arm
- LeKiwi: Mobile manipulation platform
- Reachy 2: Humanoid robot
- OpenARM: Open-source robot arm
- Unitree G1: Humanoid robot
- EarthRover: Mobile robot
See the robots documentation for setup guides.
Next Steps
Example Implementations
Study existing robot implementations: