Skip to main content
The Robot protocol is the foundation of rfx. It provides a consistent API for controlling any robot — real hardware, simulation, or mock testing environments.

Protocol Definition

Every robot in rfx implements three core methods:
class Robot(Protocol):
    def observe(self) -> dict[str, torch.Tensor]:
        """Get current state as a dictionary of tensors."""
        ...
    
    def act(self, action: torch.Tensor) -> None:
        """Execute an action on the robot."""
        ...
    
    def reset(self, env_ids: torch.Tensor | None = None) -> dict[str, torch.Tensor]:
        """Reset environments and return initial observation."""
        ...
Source: rfx/python/rfx/robot/__init__.py:27-107

Properties

Robots expose standardized properties for dimension handling:
PropertyDescription
num_envsNumber of parallel environments (1 for real robots)
state_dimActual state dimensionality (e.g., 6 for SO-101)
action_dimActual action dimensionality (e.g., 6 for SO-101)
max_state_dimPadded state dimension for multi-embodiment training (default: 64)
max_action_dimPadded action dimension for multi-embodiment training (default: 64)
deviceDevice for tensors ("cuda" or "cpu")

Observation Dictionary

The observe() method returns a dictionary with these keys:
obs = robot.observe()
# Returns:
{
    "state": torch.Tensor  # shape: (num_envs, max_state_dim)
}

Creating Robots

Real Hardware

import rfx

# From built-in config
robot = rfx.RealRobot.from_config(rfx.SO101_CONFIG, port="/dev/ttyACM0")

# From YAML file
robot = rfx.RealRobot.from_config("configs/so101.yaml", port="/dev/ttyACM0")

Simulation

import rfx

# Mock robot for testing
robot = rfx.MockRobot(state_dim=12, action_dim=6)

# Using factory functions
from rfx.robot import lerobot
arm = lerobot.so101()
Source: rfx/examples/deploy_real.py:93-103

Implementation Base Class

The RobotBase abstract class provides common functionality:
from rfx import RobotBase
import torch

class MyRobot(RobotBase):
    def __init__(self):
        super().__init__(
            state_dim=12,
            action_dim=6,
            num_envs=1,
            max_state_dim=64,
            max_action_dim=64,
            device="cpu"
        )
    
    def observe(self) -> dict[str, torch.Tensor]:
        # Read from hardware/sim
        state = self._read_joint_states()
        return {"state": state}
    
    def act(self, action: torch.Tensor) -> None:
        # Send commands to hardware/sim
        self._write_joint_commands(action)
    
    def reset(self, env_ids=None) -> dict[str, torch.Tensor]:
        # Reset to initial pose
        self._move_to_home_position()
        return self.observe()
Source: rfx/python/rfx/robot/__init__.py:110-170

Action Shapes

The act() method accepts two formats:
# Single-step action
action = torch.zeros(num_envs, max_action_dim)
robot.act(action)

# Multi-step action (action chunking)
action = torch.zeros(num_envs, horizon, max_action_dim)
robot.act(action)
Use robot.max_action_dim and robot.max_state_dim to create properly-sized tensors that work across different robot embodiments.

Multi-Embodiment Support

The padding system enables training a single policy across different robots:
# SO-101: 6 DOF arm
so101 = rfx.RealRobot.from_config(rfx.SO101_CONFIG)
print(so101.action_dim)      # 6
print(so101.max_action_dim)  # 64 (padded)

# G1: 23 DOF humanoid
g1 = rfx.RealRobot.from_config(rfx.G1_CONFIG)
print(g1.action_dim)         # 23
print(g1.max_action_dim)     # 64 (padded)

# Same policy works for both
def policy(obs):
    return model(obs["state"])  # Both use (1, 64) state

Best Practices

Real robots have num_envs=1, but simulators may have thousands. Always use batch operations:
obs = robot.observe()
# Don't: obs["state"][0]  
# Do: obs["state"]  # Already batched
When sending actions to hardware drivers that don’t expect padding:
from rfx import unpad_action

action_padded = policy(obs)  # (1, 64)
action_actual = unpad_action(action_padded, robot.action_dim)  # (1, 6)
Source: rfx/python/rfx/observation.py:64-71
Always call reset() before your control loop:
obs = robot.reset()
for _ in range(steps):
    action = policy(obs)
    robot.act(action)
    obs = robot.observe()

Observations

Learn about observation specs and multi-modal data

Policies

Define policies that map observations to actions

Control Loop

Run policies on robots with rate control

Build docs developers (and LLMs) love