Overview
rfx supports two methods for integrating custom robots:
- YAML Configuration (simple, no code required)
- Python Backend Class (full control, custom transport logic)
Both methods expose the same three-method interface: observe(), act(), reset().
Method 1: YAML Configuration
For robots with standard interfaces (serial, Zenoh topics), you can define a YAML config and use the built-in backends.
Example: 7-DOF Manipulator
Create my_robot.yaml:
name: MyRobot 7-DOF
state_dim: 14 # 7 positions + 7 velocities
action_dim: 7 # 7 joint position commands
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 100
joints:
- name: joint_0
index: 0
position_min: -3.14159
position_max: 3.14159
- name: joint_1
index: 1
position_min: -2.0
position_max: 2.0
- name: joint_2
index: 2
position_min: -2.5
position_max: 2.5
- name: joint_3
index: 3
position_min: -1.57
position_max: 1.57
- name: joint_4
index: 4
position_min: -3.14159
position_max: 3.14159
- name: joint_5
index: 5
position_min: -1.57
position_max: 1.57
- name: joint_6
index: 6
position_min: -3.14159
position_max: 3.14159
cameras:
- name: wrist_cam
width: 640
height: 480
fps: 30
device_id: 0
hardware:
# For serial robots (like SO-101)
port: /dev/ttyUSB0
baudrate: 115200
# OR for Zenoh-based robots (like Innate)
# zenoh_state_topic: myrobot/joint_states
# zenoh_cmd_topic: myrobot/joint_commands
# msg_format: json # or "cdr" for ROS 2 CDR encoding
Usage
from rfx import RealRobot
# Load from config
robot = RealRobot.from_config("my_robot.yaml")
obs = robot.observe()
robot.act(action)
robot.disconnect()
Config Reference
Human-readable robot name (e.g., “MyRobot 7-DOF”)
Actual state dimension (e.g., 14 for 7 positions + 7 velocities)
Actual action dimension (e.g., 7 for 7-DOF arm)
Padding size for multi-embodiment compatibility
Padding size for multi-embodiment actions
Target control loop frequency
Path to URDF file (for simulation)
List of joint configurations with name, index, position_min, position_max, velocity_max, effort_max
List of camera configurations with name, width, height, fps, device_id
Hardware-specific settings (port, IP, Zenoh topics, etc.)
Method 2: Python Backend Class
For robots requiring custom communication logic, implement a backend class.
Minimal Backend
Create my_robot_backend.py:
import torch
from rfx.robot.config import RobotConfig
from rfx.observation import make_observation
class MyRobotBackend:
"""Custom backend for MyRobot."""
def __init__(self, config: RobotConfig, **kwargs):
self.config = config
# Initialize hardware connection here
self._connected = True
def is_connected(self) -> bool:
return self._connected
def observe(self) -> dict[str, torch.Tensor]:
"""Read state from hardware and return observation dict."""
# TODO: Read from your hardware
positions = torch.zeros(self.config.action_dim, dtype=torch.float32)
velocities = torch.zeros(self.config.action_dim, dtype=torch.float32)
raw_state = torch.cat([positions, velocities]).unsqueeze(0)
return make_observation(
state=raw_state,
state_dim=self.config.state_dim,
max_state_dim=self.config.max_state_dim,
device="cpu",
)
def act(self, action: torch.Tensor) -> None:
"""Send action to hardware."""
# Extract actual action dimensions (first action_dim values)
targets = action[0, :self.config.action_dim].cpu().tolist()
# TODO: Send to your hardware
def reset(self) -> dict[str, torch.Tensor]:
"""Reset robot to home position."""
# TODO: Send home command to hardware
return self.observe()
def go_home(self) -> None:
"""Return to home position."""
# TODO: Send home command
pass
def disconnect(self) -> None:
"""Clean up hardware connection."""
self._connected = False
# TODO: Close hardware connection
Register the Backend
Option 1: Modify RealRobot._create_backend
Edit rfx/python/rfx/real/base.py:
def _create_backend(self, robot_type: str, hardware_config: dict):
if robot_type == "myrobot":
from my_robot_backend import MyRobotBackend
return MyRobotBackend(config=self._config, **hardware_config)
# ... existing backends
Option 2: Use the driver registry (advanced)
Create a driver plugin:
# my_robot_driver.py
from rfx.drivers import register_driver
from my_robot_backend import MyRobotBackend
register_driver("myrobot", MyRobotBackend)
Import before use:
import my_robot_driver # registers the driver
from rfx import RealRobot
robot = RealRobot.from_config("my_robot.yaml", robot_type="myrobot")
Usage
from rfx import RealRobot
robot = RealRobot.from_config("my_robot.yaml", robot_type="myrobot")
obs = robot.observe()
# {"state": Tensor(1, 64)} — padded from state_dim
action = torch.zeros(1, 64) # max_action_dim=64
action[0, :7] = torch.tensor([...]) # set first 7 joints
robot.act(action)
robot.disconnect()
Example: Zenoh-Based Robot
For robots that publish state and accept commands over Zenoh topics (like the Innate bot), use the built-in InnateBackend as a reference.
YAML Config
name: MyZenohRobot
state_dim: 12
action_dim: 6
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 50
hardware:
zenoh_state_topic: myrobot/joint_states
zenoh_cmd_topic: myrobot/joint_commands
msg_format: json # or "cdr" for ROS 2 CDR-encoded sensor_msgs/JointState
zenoh_endpoint: tcp/192.168.1.100:7447 # optional router address
Python Usage
from rfx import RealRobot
robot = RealRobot.from_config("my_zenoh_robot.yaml", robot_type="innate")
obs = robot.observe()
robot.act(action)
robot.disconnect()
The innate backend subscribes to zenoh_state_topic and publishes to zenoh_cmd_topic. State messages can be:
- JSON:
{"position": [...], "velocity": [...], "effort": [...]}
- CDR: ROS 2
sensor_msgs/msg/JointState (binary)
See rfx/python/rfx/real/innate.py for the full implementation.
Example: Serial Robot
For serial-based robots (like SO-101), the config specifies port and baudrate:
name: MySerialArm
state_dim: 10
action_dim: 5
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 50
hardware:
port: /dev/ttyUSB0
baudrate: 115200
If your protocol matches the Feetech serial bus, you can reuse the SO-101 backend. Otherwise, implement a custom backend class (see Method 2).
Multi-Embodiment Training
All robots in rfx pad state and actions to 64 dimensions for multi-embodiment compatibility:
# SO-101: state_dim=12, action_dim=6
obs = so101.observe()
obs["state"].shape # (1, 64) ← padded from 12
# Go2: state_dim=34, action_dim=12
obs = go2.observe()
obs["state"].shape # (1, 64) ← padded from 34
# Actions are also padded
action = torch.zeros(1, 64) # set first N dims for your robot
so101.act(action) # uses action[0, :6]
go2.act(action) # uses action[0, :12]
This allows training a single policy across multiple robot types. Adjust max_state_dim and max_action_dim if you need larger padding.
Named Joint Control
Use MotorCommands for semantic control:
import rfx
from rfx.robot.config import RobotConfig
config = RobotConfig.from_yaml("my_robot.yaml")
# Control by joint name
action = rfx.MotorCommands(
{"joint_0": 0.5, "joint_3": -0.2},
config=config,
).to_tensor()
robot.act(action)
This maps joint names to indices and handles padding automatically.
Adding Camera Support
Include cameras in the YAML config:
cameras:
- name: wrist_cam
width: 640
height: 480
fps: 30
device_id: 0
- name: base_cam
width: 1280
height: 720
fps: 60
device_id: 1
In your backend, attach a Camera instance:
from rfx.real.camera import Camera
class MyRobotBackend:
def __init__(self, config: RobotConfig, camera_id=None, **kwargs):
self.config = config
self._camera = None
if camera_id is not None:
self._camera = Camera(device_id=camera_id)
def observe(self) -> dict[str, torch.Tensor]:
obs = make_observation(...) # state only
if self._camera is not None:
image = self._camera.capture() # (C, H, W)
obs["images"] = image.unsqueeze(0).unsqueeze(0) # (1, 1, C, H, W)
return obs
def disconnect(self) -> None:
if self._camera is not None:
self._camera.release()
See rfx/python/rfx/real/so101.py:278 for a reference implementation.
Testing Without Hardware
Use MockRobot to validate your config before connecting real hardware:
from rfx import MockRobot
# Matches your robot's dimensions
robot = MockRobot(state_dim=14, action_dim=7)
obs = robot.observe()
robot.act(torch.zeros(1, 64))
robot.reset()
MockRobot runs a simple spring-damper physics model in PyTorch—no external dependencies.
Deploying Policies
Once your robot is configured, deploy policies like any other robot:
# From local checkpoint
rfx deploy runs/my-policy --robot myrobot
# From HuggingFace Hub
rfx deploy hf://user/my-policy --robot myrobot --duration 30
# Mock mode for testing
rfx deploy runs/my-policy --robot myrobot --mock
Or from Python:
import rfx
rfx.deploy("runs/my-policy", robot="myrobot")
rfx.deploy("hf://user/my-policy", robot="myrobot", duration=30)
Full Backend Example: Innate Bot
See rfx/python/rfx/real/innate.py for a complete reference implementation of a Zenoh-based backend:
- Subscribes to
zenoh_state_topic for joint states
- Publishes to
zenoh_cmd_topic for commands
- Supports JSON and CDR message formats
- Handles threading and state locking
- Implements full
observe(), act(), reset() interface
Key snippets:
# rfx/python/rfx/real/innate.py:114
class InnateBackend:
def __init__(self, config, zenoh_state_topic=None, zenoh_cmd_topic=None, ...):
import zenoh
self._session = zenoh.open(zenoh.Config())
self._cmd_publisher = self._session.declare_publisher(self._cmd_topic)
self._subscriber = self._session.declare_subscriber(self._state_topic, self._on_state)
def _on_state(self, sample):
# Parse incoming state message (JSON or CDR)
with self._state_lock:
self._latest_state = parsed
def observe(self):
with self._state_lock:
state = self._latest_state
# Convert to torch.Tensor and pad
def act(self, action):
targets = action[0, :self.config.action_dim].cpu().tolist()
payload = json.dumps({"position": targets}).encode()
self._cmd_publisher.put(payload)
Next Steps