Overview
The rfx driver system allows you to extend the framework with custom robot interfaces. Drivers are registered via the global registry and can be discovered through entry points or explicit registration.
Driver Protocol
All robot drivers must implement the RobotDriver protocol:
from typing import Protocol, Any, runtime_checkable
@runtime_checkable
class RobotDriver ( Protocol ):
"""Protocol that all robot drivers must implement."""
def connect ( self , ** kwargs : Any) -> None : ...
def disconnect ( self ) -> None : ...
def is_connected ( self ) -> bool : ...
def read_state ( self ) -> dict[ str , Any]: ...
def send_command ( self , command : dict[ str , Any]) -> None : ...
@ property
def name ( self ) -> str : ...
@ property
def num_joints ( self ) -> int : ...
The @runtime_checkable decorator enables runtime type checking with isinstance(). All drivers are validated against this protocol at registration time.
Creating a Custom Driver
Implement the RobotDriver protocol to create a new robot interface:
import rfx
from typing import Any
class MyRobotDriver :
"""Custom driver for MyRobot platform."""
def __init__ ( self , robot_id : str = "robot-01" ):
self ._robot_id = robot_id
self ._connected = False
self ._state = {}
@ property
def name ( self ) -> str :
return f "MyRobot- { self ._robot_id } "
@ property
def num_joints ( self ) -> int :
return 12 # Example: 12-DOF robot
def connect ( self , ** kwargs : Any) -> None :
"""Establish connection to the robot."""
ip_address = kwargs.get( "ip_address" , "192.168.1.100" )
# Your connection logic here
self ._connected = True
print ( f "Connected to { self .name } at { ip_address } " )
def disconnect ( self ) -> None :
"""Close connection to the robot."""
# Your disconnection logic here
self ._connected = False
def is_connected ( self ) -> bool :
"""Check if robot is connected."""
return self ._connected
def read_state ( self ) -> dict[ str , Any]:
"""Read current robot state."""
# Your state reading logic here
return {
"joint_positions" : [ 0.0 ] * self .num_joints,
"joint_velocities" : [ 0.0 ] * self .num_joints,
"imu" : { "roll" : 0.0 , "pitch" : 0.0 , "yaw" : 0.0 },
}
def send_command ( self , command : dict[ str , Any]) -> None :
"""Send command to the robot."""
# Your command sending logic here
if not self ._connected:
raise RuntimeError ( "Robot not connected" )
print ( f "Sending command: { command } " )
Registering Drivers
Explicit Registration
Register your driver directly with the register_driver function:
import rfx
from rfx.drivers import register_driver
# Register the driver factory
register_driver( "myrobot" , MyRobotDriver)
# Verify registration
available = rfx.drivers.list_drivers()
print ( f "Available drivers: { available } " )
# Output: Available drivers: ['myrobot']
Get Driver Factory
List All Drivers
from rfx.drivers import get_driver
# Retrieve the driver class
DriverClass = get_driver( "myrobot" )
if DriverClass:
robot = DriverClass( robot_id = "alpha-01" )
robot.connect( ip_address = "192.168.1.100" )
Entry Point Registration
For distributable packages, use entry points in pyproject.toml:
[ project . entry-points . "rfx . drivers" ]
myrobot = "myrobot_package:MyRobotDriver"
custom_arm = "myarm_package.drivers:ArmDriver"
Drivers registered via entry points are auto-discovered on import:
import rfx # Auto-discovers all rfx.drivers entry points
# Your driver is now available
driver = rfx.drivers.get_driver( "myrobot" )
Integration with Rust Core
For performance-critical drivers, implement the core logic in Rust and expose via PyO3:
import _rfx # The compiled Rust extension
class RustBackedDriver :
"""Driver with Rust-based communication backend."""
def __init__ ( self ):
# Use Rust types for high-performance I/O
self ._transport = _rfx.Transport.zenoh(
connect = [ "tcp/192.168.1.100:7447" ],
shared_memory = True
)
self ._state_sub = self ._transport.subscribe( "robot/state" )
def read_state ( self ) -> dict[ str , Any]:
"""Read state using zero-copy Rust transport."""
envelope = self ._state_sub.try_recv()
if envelope:
# Parse payload from Rust
import json
return json.loads(envelope.payload.decode())
return {}
The Rust core handles DDS communication, control loops, and real-time math. Python drivers wrap Rust types via PyO3 bindings for optimal performance.
Driver Registry Internals
The driver registry is implemented in rfx/python/rfx/drivers.py:14-73:
# Global driver registry
_DRIVER_REGISTRY : dict[ str , type ] = {}
def register_driver ( name : str , factory : type ) -> None :
"""Register a driver factory by name."""
_DRIVER_REGISTRY [name] = factory
logger.debug( "Registered driver: %s " , name)
def get_driver ( name : str ) -> type | None :
"""Get a registered driver factory by name."""
return _DRIVER_REGISTRY .get(name)
The registry uses simple dictionary lookup for O(1) driver retrieval. Entry points are discovered once on module import via importlib.metadata.
Example: Full Custom Driver
Here’s a complete example integrating Rust transport, skills, and state management:
import rfx
from rfx.drivers import register_driver
from typing import Any
import time
class QuadrupedDriver :
"""Full-featured quadruped robot driver."""
def __init__ ( self , robot_id : str = "go2-01" ):
self ._robot_id = robot_id
self ._go2 = None # Will hold _rfx.Go2 instance
@ property
def name ( self ) -> str :
return f "Quadruped- { self ._robot_id } "
@ property
def num_joints ( self ) -> int :
return 12
def connect ( self , ** kwargs : Any) -> None :
config = _rfx.Go2Config(kwargs.get( "ip_address" , "192.168.123.161" ))
config = config.with_backend( "zenoh" )
self ._go2 = _rfx.Go2.connect(config)
def disconnect ( self ) -> None :
if self ._go2:
self ._go2.disconnect()
def is_connected ( self ) -> bool :
return self ._go2 is not None and self ._go2.is_connected()
def read_state ( self ) -> dict[ str , Any]:
if not self ._go2:
return {}
state = self ._go2.state()
return {
"tick" : state.tick,
"timestamp" : state.timestamp,
"joint_positions" : state.joint_positions(),
"joint_velocities" : state.joint_velocities(),
"imu" : {
"roll" : state.imu.roll,
"pitch" : state.imu.pitch,
"yaw" : state.imu.yaw,
},
}
def send_command ( self , command : dict[ str , Any]) -> None :
if not self ._go2:
raise RuntimeError ( "Not connected" )
cmd_type = command.get( "type" )
if cmd_type == "walk" :
self ._go2.walk(command[ "vx" ], command[ "vy" ], command[ "vyaw" ])
elif cmd_type == "stand" :
self ._go2.stand()
elif cmd_type == "sit" :
self ._go2.sit()
# Register the driver
register_driver( "quadruped" , QuadrupedDriver)
Best Practices
Use Type Hints Fully type-annotate your drivers for better IDE support and runtime validation.
Release GIL For blocking I/O, use Rust backends that release Python’s GIL automatically.
Error Handling Raise descriptive exceptions on connection failures or invalid commands.
State Caching Cache robot state internally and update asynchronously for better performance.
See Also