Skip to main content

Overview

The rfx driver system allows you to extend the framework with custom robot interfaces. Drivers are registered via the global registry and can be discovered through entry points or explicit registration.

Driver Protocol

All robot drivers must implement the RobotDriver protocol:
from typing import Protocol, Any, runtime_checkable

@runtime_checkable
class RobotDriver(Protocol):
    """Protocol that all robot drivers must implement."""

    def connect(self, **kwargs: Any) -> None: ...
    def disconnect(self) -> None: ...
    def is_connected(self) -> bool: ...
    def read_state(self) -> dict[str, Any]: ...
    def send_command(self, command: dict[str, Any]) -> None: ...

    @property
    def name(self) -> str: ...

    @property
    def num_joints(self) -> int: ...
The @runtime_checkable decorator enables runtime type checking with isinstance(). All drivers are validated against this protocol at registration time.

Creating a Custom Driver

Implement the RobotDriver protocol to create a new robot interface:
import rfx
from typing import Any

class MyRobotDriver:
    """Custom driver for MyRobot platform."""

    def __init__(self, robot_id: str = "robot-01"):
        self._robot_id = robot_id
        self._connected = False
        self._state = {}

    @property
    def name(self) -> str:
        return f"MyRobot-{self._robot_id}"

    @property
    def num_joints(self) -> int:
        return 12  # Example: 12-DOF robot

    def connect(self, **kwargs: Any) -> None:
        """Establish connection to the robot."""
        ip_address = kwargs.get("ip_address", "192.168.1.100")
        # Your connection logic here
        self._connected = True
        print(f"Connected to {self.name} at {ip_address}")

    def disconnect(self) -> None:
        """Close connection to the robot."""
        # Your disconnection logic here
        self._connected = False

    def is_connected(self) -> bool:
        """Check if robot is connected."""
        return self._connected

    def read_state(self) -> dict[str, Any]:
        """Read current robot state."""
        # Your state reading logic here
        return {
            "joint_positions": [0.0] * self.num_joints,
            "joint_velocities": [0.0] * self.num_joints,
            "imu": {"roll": 0.0, "pitch": 0.0, "yaw": 0.0},
        }

    def send_command(self, command: dict[str, Any]) -> None:
        """Send command to the robot."""
        # Your command sending logic here
        if not self._connected:
            raise RuntimeError("Robot not connected")
        print(f"Sending command: {command}")

Registering Drivers

Explicit Registration

Register your driver directly with the register_driver function:
import rfx
from rfx.drivers import register_driver

# Register the driver factory
register_driver("myrobot", MyRobotDriver)

# Verify registration
available = rfx.drivers.list_drivers()
print(f"Available drivers: {available}")
# Output: Available drivers: ['myrobot']
from rfx.drivers import get_driver

# Retrieve the driver class
DriverClass = get_driver("myrobot")
if DriverClass:
    robot = DriverClass(robot_id="alpha-01")
    robot.connect(ip_address="192.168.1.100")

Entry Point Registration

For distributable packages, use entry points in pyproject.toml:
[project.entry-points."rfx.drivers"]
myrobot = "myrobot_package:MyRobotDriver"
custom_arm = "myarm_package.drivers:ArmDriver"
Drivers registered via entry points are auto-discovered on import:
import rfx  # Auto-discovers all rfx.drivers entry points

# Your driver is now available
driver = rfx.drivers.get_driver("myrobot")

Integration with Rust Core

For performance-critical drivers, implement the core logic in Rust and expose via PyO3:
import _rfx  # The compiled Rust extension

class RustBackedDriver:
    """Driver with Rust-based communication backend."""

    def __init__(self):
        # Use Rust types for high-performance I/O
        self._transport = _rfx.Transport.zenoh(
            connect=["tcp/192.168.1.100:7447"],
            shared_memory=True
        )
        self._state_sub = self._transport.subscribe("robot/state")

    def read_state(self) -> dict[str, Any]:
        """Read state using zero-copy Rust transport."""
        envelope = self._state_sub.try_recv()
        if envelope:
            # Parse payload from Rust
            import json
            return json.loads(envelope.payload.decode())
        return {}
The Rust core handles DDS communication, control loops, and real-time math. Python drivers wrap Rust types via PyO3 bindings for optimal performance.

Driver Registry Internals

The driver registry is implemented in rfx/python/rfx/drivers.py:14-73:
# Global driver registry
_DRIVER_REGISTRY: dict[str, type] = {}

def register_driver(name: str, factory: type) -> None:
    """Register a driver factory by name."""
    _DRIVER_REGISTRY[name] = factory
    logger.debug("Registered driver: %s", name)

def get_driver(name: str) -> type | None:
    """Get a registered driver factory by name."""
    return _DRIVER_REGISTRY.get(name)
The registry uses simple dictionary lookup for O(1) driver retrieval. Entry points are discovered once on module import via importlib.metadata.

Example: Full Custom Driver

Here’s a complete example integrating Rust transport, skills, and state management:
import rfx
from rfx.drivers import register_driver
from typing import Any
import time

class QuadrupedDriver:
    """Full-featured quadruped robot driver."""

    def __init__(self, robot_id: str = "go2-01"):
        self._robot_id = robot_id
        self._go2 = None  # Will hold _rfx.Go2 instance

    @property
    def name(self) -> str:
        return f"Quadruped-{self._robot_id}"

    @property
    def num_joints(self) -> int:
        return 12

    def connect(self, **kwargs: Any) -> None:
        config = _rfx.Go2Config(kwargs.get("ip_address", "192.168.123.161"))
        config = config.with_backend("zenoh")
        self._go2 = _rfx.Go2.connect(config)

    def disconnect(self) -> None:
        if self._go2:
            self._go2.disconnect()

    def is_connected(self) -> bool:
        return self._go2 is not None and self._go2.is_connected()

    def read_state(self) -> dict[str, Any]:
        if not self._go2:
            return {}
        state = self._go2.state()
        return {
            "tick": state.tick,
            "timestamp": state.timestamp,
            "joint_positions": state.joint_positions(),
            "joint_velocities": state.joint_velocities(),
            "imu": {
                "roll": state.imu.roll,
                "pitch": state.imu.pitch,
                "yaw": state.imu.yaw,
            },
        }

    def send_command(self, command: dict[str, Any]) -> None:
        if not self._go2:
            raise RuntimeError("Not connected")

        cmd_type = command.get("type")
        if cmd_type == "walk":
            self._go2.walk(command["vx"], command["vy"], command["vyaw"])
        elif cmd_type == "stand":
            self._go2.stand()
        elif cmd_type == "sit":
            self._go2.sit()

# Register the driver
register_driver("quadruped", QuadrupedDriver)

Best Practices

Use Type Hints

Fully type-annotate your drivers for better IDE support and runtime validation.

Release GIL

For blocking I/O, use Rust backends that release Python’s GIL automatically.

Error Handling

Raise descriptive exceptions on connection failures or invalid commands.

State Caching

Cache robot state internally and update asynchronously for better performance.

See Also

Build docs developers (and LLMs) love