Skip to main content

Overview

rfx supports two methods for integrating custom robots:
  1. YAML Configuration (simple, no code required)
  2. Python Backend Class (full control, custom transport logic)
Both methods expose the same three-method interface: observe(), act(), reset().

Method 1: YAML Configuration

For robots with standard interfaces (serial, Zenoh topics), you can define a YAML config and use the built-in backends.

Example: 7-DOF Manipulator

Create my_robot.yaml:
name: MyRobot 7-DOF
state_dim: 14      # 7 positions + 7 velocities
action_dim: 7      # 7 joint position commands
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 100

joints:
  - name: joint_0
    index: 0
    position_min: -3.14159
    position_max: 3.14159
  - name: joint_1
    index: 1
    position_min: -2.0
    position_max: 2.0
  - name: joint_2
    index: 2
    position_min: -2.5
    position_max: 2.5
  - name: joint_3
    index: 3
    position_min: -1.57
    position_max: 1.57
  - name: joint_4
    index: 4
    position_min: -3.14159
    position_max: 3.14159
  - name: joint_5
    index: 5
    position_min: -1.57
    position_max: 1.57
  - name: joint_6
    index: 6
    position_min: -3.14159
    position_max: 3.14159

cameras:
  - name: wrist_cam
    width: 640
    height: 480
    fps: 30
    device_id: 0

hardware:
  # For serial robots (like SO-101)
  port: /dev/ttyUSB0
  baudrate: 115200

  # OR for Zenoh-based robots (like Innate)
  # zenoh_state_topic: myrobot/joint_states
  # zenoh_cmd_topic: myrobot/joint_commands
  # msg_format: json  # or "cdr" for ROS 2 CDR encoding

Usage

from rfx import RealRobot

# Load from config
robot = RealRobot.from_config("my_robot.yaml")

obs = robot.observe()
robot.act(action)
robot.disconnect()

Config Reference

name
string
required
Human-readable robot name (e.g., “MyRobot 7-DOF”)
state_dim
int
required
Actual state dimension (e.g., 14 for 7 positions + 7 velocities)
action_dim
int
required
Actual action dimension (e.g., 7 for 7-DOF arm)
max_state_dim
int
default:"64"
Padding size for multi-embodiment compatibility
max_action_dim
int
default:"64"
Padding size for multi-embodiment actions
control_freq_hz
int
default:"50"
Target control loop frequency
urdf_path
string
Path to URDF file (for simulation)
joints
list[JointConfig]
List of joint configurations with name, index, position_min, position_max, velocity_max, effort_max
cameras
list[CameraConfig]
List of camera configurations with name, width, height, fps, device_id
hardware
dict
required
Hardware-specific settings (port, IP, Zenoh topics, etc.)

Method 2: Python Backend Class

For robots requiring custom communication logic, implement a backend class.

Minimal Backend

Create my_robot_backend.py:
import torch
from rfx.robot.config import RobotConfig
from rfx.observation import make_observation

class MyRobotBackend:
    """Custom backend for MyRobot."""

    def __init__(self, config: RobotConfig, **kwargs):
        self.config = config
        # Initialize hardware connection here
        self._connected = True

    def is_connected(self) -> bool:
        return self._connected

    def observe(self) -> dict[str, torch.Tensor]:
        """Read state from hardware and return observation dict."""
        # TODO: Read from your hardware
        positions = torch.zeros(self.config.action_dim, dtype=torch.float32)
        velocities = torch.zeros(self.config.action_dim, dtype=torch.float32)

        raw_state = torch.cat([positions, velocities]).unsqueeze(0)

        return make_observation(
            state=raw_state,
            state_dim=self.config.state_dim,
            max_state_dim=self.config.max_state_dim,
            device="cpu",
        )

    def act(self, action: torch.Tensor) -> None:
        """Send action to hardware."""
        # Extract actual action dimensions (first action_dim values)
        targets = action[0, :self.config.action_dim].cpu().tolist()
        # TODO: Send to your hardware

    def reset(self) -> dict[str, torch.Tensor]:
        """Reset robot to home position."""
        # TODO: Send home command to hardware
        return self.observe()

    def go_home(self) -> None:
        """Return to home position."""
        # TODO: Send home command
        pass

    def disconnect(self) -> None:
        """Clean up hardware connection."""
        self._connected = False
        # TODO: Close hardware connection

Register the Backend

Option 1: Modify RealRobot._create_backend Edit rfx/python/rfx/real/base.py:
def _create_backend(self, robot_type: str, hardware_config: dict):
    if robot_type == "myrobot":
        from my_robot_backend import MyRobotBackend
        return MyRobotBackend(config=self._config, **hardware_config)
    # ... existing backends
Option 2: Use the driver registry (advanced) Create a driver plugin:
# my_robot_driver.py
from rfx.drivers import register_driver
from my_robot_backend import MyRobotBackend

register_driver("myrobot", MyRobotBackend)
Import before use:
import my_robot_driver  # registers the driver
from rfx import RealRobot

robot = RealRobot.from_config("my_robot.yaml", robot_type="myrobot")

Usage

from rfx import RealRobot

robot = RealRobot.from_config("my_robot.yaml", robot_type="myrobot")

obs = robot.observe()
# {"state": Tensor(1, 64)} — padded from state_dim

action = torch.zeros(1, 64)  # max_action_dim=64
action[0, :7] = torch.tensor([...])  # set first 7 joints
robot.act(action)

robot.disconnect()

Example: Zenoh-Based Robot

For robots that publish state and accept commands over Zenoh topics (like the Innate bot), use the built-in InnateBackend as a reference.

YAML Config

name: MyZenohRobot
state_dim: 12
action_dim: 6
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 50

hardware:
  zenoh_state_topic: myrobot/joint_states
  zenoh_cmd_topic: myrobot/joint_commands
  msg_format: json  # or "cdr" for ROS 2 CDR-encoded sensor_msgs/JointState
  zenoh_endpoint: tcp/192.168.1.100:7447  # optional router address

Python Usage

from rfx import RealRobot

robot = RealRobot.from_config("my_zenoh_robot.yaml", robot_type="innate")

obs = robot.observe()
robot.act(action)
robot.disconnect()
The innate backend subscribes to zenoh_state_topic and publishes to zenoh_cmd_topic. State messages can be:
  • JSON: {"position": [...], "velocity": [...], "effort": [...]}
  • CDR: ROS 2 sensor_msgs/msg/JointState (binary)
See rfx/python/rfx/real/innate.py for the full implementation.

Example: Serial Robot

For serial-based robots (like SO-101), the config specifies port and baudrate:
name: MySerialArm
state_dim: 10
action_dim: 5
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 50

hardware:
  port: /dev/ttyUSB0
  baudrate: 115200
If your protocol matches the Feetech serial bus, you can reuse the SO-101 backend. Otherwise, implement a custom backend class (see Method 2).

Multi-Embodiment Training

All robots in rfx pad state and actions to 64 dimensions for multi-embodiment compatibility:
# SO-101: state_dim=12, action_dim=6
obs = so101.observe()
obs["state"].shape  # (1, 64)  ← padded from 12

# Go2: state_dim=34, action_dim=12
obs = go2.observe()
obs["state"].shape  # (1, 64)  ← padded from 34

# Actions are also padded
action = torch.zeros(1, 64)  # set first N dims for your robot
so101.act(action)  # uses action[0, :6]
go2.act(action)    # uses action[0, :12]
This allows training a single policy across multiple robot types. Adjust max_state_dim and max_action_dim if you need larger padding.

Named Joint Control

Use MotorCommands for semantic control:
import rfx
from rfx.robot.config import RobotConfig

config = RobotConfig.from_yaml("my_robot.yaml")

# Control by joint name
action = rfx.MotorCommands(
    {"joint_0": 0.5, "joint_3": -0.2},
    config=config,
).to_tensor()

robot.act(action)
This maps joint names to indices and handles padding automatically.

Adding Camera Support

Include cameras in the YAML config:
cameras:
  - name: wrist_cam
    width: 640
    height: 480
    fps: 30
    device_id: 0
  - name: base_cam
    width: 1280
    height: 720
    fps: 60
    device_id: 1
In your backend, attach a Camera instance:
from rfx.real.camera import Camera

class MyRobotBackend:
    def __init__(self, config: RobotConfig, camera_id=None, **kwargs):
        self.config = config
        self._camera = None
        if camera_id is not None:
            self._camera = Camera(device_id=camera_id)

    def observe(self) -> dict[str, torch.Tensor]:
        obs = make_observation(...)  # state only
        if self._camera is not None:
            image = self._camera.capture()  # (C, H, W)
            obs["images"] = image.unsqueeze(0).unsqueeze(0)  # (1, 1, C, H, W)
        return obs

    def disconnect(self) -> None:
        if self._camera is not None:
            self._camera.release()
See rfx/python/rfx/real/so101.py:278 for a reference implementation.

Testing Without Hardware

Use MockRobot to validate your config before connecting real hardware:
from rfx import MockRobot

# Matches your robot's dimensions
robot = MockRobot(state_dim=14, action_dim=7)

obs = robot.observe()
robot.act(torch.zeros(1, 64))
robot.reset()
MockRobot runs a simple spring-damper physics model in PyTorch—no external dependencies.

Deploying Policies

Once your robot is configured, deploy policies like any other robot:
# From local checkpoint
rfx deploy runs/my-policy --robot myrobot

# From HuggingFace Hub
rfx deploy hf://user/my-policy --robot myrobot --duration 30

# Mock mode for testing
rfx deploy runs/my-policy --robot myrobot --mock
Or from Python:
import rfx

rfx.deploy("runs/my-policy", robot="myrobot")
rfx.deploy("hf://user/my-policy", robot="myrobot", duration=30)

Full Backend Example: Innate Bot

See rfx/python/rfx/real/innate.py for a complete reference implementation of a Zenoh-based backend:
  • Subscribes to zenoh_state_topic for joint states
  • Publishes to zenoh_cmd_topic for commands
  • Supports JSON and CDR message formats
  • Handles threading and state locking
  • Implements full observe(), act(), reset() interface
Key snippets:
# rfx/python/rfx/real/innate.py:114
class InnateBackend:
    def __init__(self, config, zenoh_state_topic=None, zenoh_cmd_topic=None, ...):
        import zenoh
        self._session = zenoh.open(zenoh.Config())
        self._cmd_publisher = self._session.declare_publisher(self._cmd_topic)
        self._subscriber = self._session.declare_subscriber(self._state_topic, self._on_state)

    def _on_state(self, sample):
        # Parse incoming state message (JSON or CDR)
        with self._state_lock:
            self._latest_state = parsed

    def observe(self):
        with self._state_lock:
            state = self._latest_state
        # Convert to torch.Tensor and pad

    def act(self, action):
        targets = action[0, :self.config.action_dim].cpu().tolist()
        payload = json.dumps({"position": targets}).encode()
        self._cmd_publisher.put(payload)

Next Steps

Build docs developers (and LLMs) love