Overview
The rfx framework is built on a hybrid Python/Rust architecture:
Python layer (rfx/python/rfx/) provides the ergonomic API, neural network integration (via tinygrad), and LLM agent skills
Rust core (rfx/crates/rfx-core/) handles real-time control, hardware communication, and performance-critical math
PyO3 bindings (rfx/crates/rfx-python/) expose Rust types to Python with automatic GIL management
This architecture delivers Python’s ease of use with Rust’s performance and safety guarantees.
Architecture Diagram
Python (rfx) Rust (rfx-core)
┌──────────────┐ ┌──────────────┐
│ tinygrad │ │ Control │
│ Policies │───actions (np)────►│ Loops │
└──────────────┘ └──────────────┘
│ │
┌──────────────┐ ┌──────────────┐
│ Skills │ │ Hardware │
│ Registry │ │ (Go2) │
└──────────────┘ └──────────────┘
│ │
┌──────────────────────────────────────────────────┐
│ PyO3 Bindings Layer │
│ - Automatic type conversion │
│ - GIL release for blocking operations │
│ - Zero-copy shared memory (where possible) │
└──────────────────────────────────────────────────┘
Rust Workspace Structure
The Rust code is organized as a Cargo workspace:
# Cargo.toml
[ workspace ]
resolver = "2"
members = [
"rfx/crates/rfx-core" ,
"rfx/crates/rfx-python" ,
]
[ workspace . package ]
version = "0.2.0"
edition = "2021"
license = "MIT"
[ workspace . dependencies ]
tokio = { version = "1" , features = [ "rt-multi-thread" , "sync" , "time" , "macros" ] }
pyo3 = { version = "0.28" , features = [ "extension-module" ] }
serde = { version = "1" , features = [ "derive" ] }
nalgebra = { version = "0.33" , default-features = false , features = [ "std" ] }
crossbeam-channel = "0.5"
parking_lot = "0.12"
zenoh = { version = "1.7" , default-features = false , features = [ "transport_tcp" , "transport_udp" , "shared-memory" ] }
rfx-core Modules
The core library (rfx/crates/rfx-core/src/lib.rs) is organized into domain-specific modules:
pub mod comm ; // Communication primitives
pub mod control ; // Control loops, PID, state machines
pub mod hardware ; // Hardware abstraction and Go2 client
pub mod math ; // Transform, quaternion, filters
pub mod neural ; // Observation/action space definitions
pub mod node ; // Robot node abstractions
pub mod sim ; // Simulation backend traits
Key Types
Transform (SE3)
Control Loop
Hardware (Go2)
/// A rigid body transformation in 3D space
pub struct Transform {
pub position : [ f64 ; 3 ],
pub orientation : Quaternion ,
}
impl Transform {
pub fn compose ( & self , other : & Transform ) -> Transform { ... }
pub fn inverse ( & self ) -> Transform { ... }
pub fn transform_point ( & self , point : [ f64 ; 3 ]) -> [ f64 ; 3 ] { ... }
}
PyO3 Bindings
The rfx-python crate (rfx/crates/rfx-python/src/lib.rs) exposes Rust types to Python:
use pyo3 :: prelude ::* ;
#[pymodule]
fn _rfx ( m : & Bound <' _ , PyModule >) -> PyResult <()> {
// Version info
m . add ( "__version__" , env! ( "CARGO_PKG_VERSION" )) ? ;
// Math types
m . add_class :: < PyQuaternion >() ? ;
m . add_class :: < PyTransform >() ? ;
m . add_class :: < PyLowPassFilter >() ? ;
// Control types
m . add_class :: < PyPid >() ? ;
m . add_class :: < PyControlLoopHandle >() ? ;
// Hardware types
m . add_class :: < PyGo2 >() ? ;
m . add_class :: < PyGo2Config >() ? ;
m . add_class :: < PyGo2State >() ? ;
// Communication types
m . add_class :: < PyTransport >() ? ;
m . add_class :: < PyStream >() ? ;
// Utility functions
m . add_function ( wrap_pyfunction! ( run_control_loop , m ) ? ) ? ;
Ok (())
}
Wrapper Types
PyO3 wrapper types bridge Rust and Python:
/// Python wrapper for Quaternion
#[pyclass(name = "Quaternion" )]
#[derive( Clone )]
pub struct PyQuaternion {
inner : rfx_core :: math :: Quaternion ,
}
#[pymethods]
impl PyQuaternion {
#[new]
#[pyo3(signature = (w = 1 . 0, x = 0 . 0, y = 0 . 0, z = 0 . 0))]
fn new ( w : f64 , x : f64 , y : f64 , z : f64 ) -> Self {
Self {
inner : rfx_core :: math :: Quaternion :: new ( w , x , y , z ),
}
}
#[staticmethod]
fn from_euler ( roll : f64 , pitch : f64 , yaw : f64 ) -> Self {
Self {
inner : rfx_core :: math :: Quaternion :: from_euler ( roll , pitch , yaw ),
}
}
fn rotate_vector ( & self , v : [ f64 ; 3 ]) -> [ f64 ; 3 ] {
self . inner . rotate_vector ( v )
}
#[getter]
fn w ( & self ) -> f64 {
self . inner . w
}
// ... more methods
}
GIL Management
The Python Global Interpreter Lock (GIL) is automatically released for blocking operations:
#[pymethods]
impl PyGo2 {
#[staticmethod]
fn connect ( py : Python <' _ >, config : PyGo2Config ) -> PyResult < Self > {
// Release GIL during connection
let result = py . detach ( || rfx_core :: hardware :: go2 :: Go2 :: connect ( config . inner));
match result {
Ok ( go2 ) => Ok ( Self { inner : Arc :: new ( go2 ) }),
Err ( e ) => Err ( PyRuntimeError :: new_err ( e . to_string ())),
}
}
fn walk ( & self , py : Python <' _ >, vx : f32 , vy : f32 , vyaw : f32 ) -> PyResult <()> {
// Release GIL during I/O
py . detach ( || self . inner . walk ( vx , vy , vyaw ))
. map_err ( | e | PyRuntimeError :: new_err ( e . to_string ()))
}
}
py.detach() releases the GIL, allowing other Python threads to run concurrently. This is critical for real-time control where blocking operations must not stall the interpreter.
Control Loop Implementation
The control loop (rfx/crates/rfx-core/src/control/control_loop.rs) uses high-precision timing:
pub fn run < F >( config : ControlLoopConfig , mut callback : F ) -> Result < ControlLoopStats >
where
F : FnMut ( u64 , f64 ) -> bool ,
{
let period = Duration :: from_secs_f64 ( 1.0 / config . rate_hz);
let mut next_tick = Instant :: now () + period ;
let mut stats = ControlLoopStats :: default ();
loop {
let start = Instant :: now ();
let dt = period . as_secs_f64 ();
// Call user callback (may acquire GIL)
if ! callback ( stats . iterations, dt ) {
break ;
}
stats . iterations += 1 ;
let elapsed = start . elapsed ();
// Track timing
stats . avg_iteration_time = stats . avg_iteration_time
. mul_add ( 0.95 , elapsed . mul_f64 ( 0.05 ));
stats . max_iteration_time = stats . max_iteration_time . max ( elapsed );
// Sleep until next tick
if let Some ( sleep_dur ) = next_tick . checked_duration_since ( Instant :: now ()) {
spin_sleep :: sleep ( sleep_dur );
} else {
stats . overruns += 1 ;
}
next_tick += period ;
}
Ok ( stats )
}
The control loop uses spin_sleep for sub-millisecond precision. It tracks overruns when the callback takes longer than the loop period.
Communication Layer
The comm module provides zero-copy transport via Zenoh:
/// Keyed transport backend (in-process or Zenoh)
pub trait TransportBackend : Send + Sync {
fn publish ( & self , key : & str , payload : Arc <[ u8 ]>, metadata : Option < Arc < str >>) -> TransportEnvelope ;
fn subscribe ( & self , pattern : & str , capacity : usize ) -> TransportSubscription ;
fn unsubscribe ( & self , subscription_id : u64 ) -> bool ;
}
/// Zenoh-backed transport with shared memory
pub struct ZenohTransport {
session : Arc < zenoh :: Session >,
config : ZenohTransportConfig ,
subscriptions : Arc < RwLock < HashMap < u64 , TransportSubscription >>>,
}
impl TransportBackend for ZenohTransport {
fn publish ( & self , key : & str , payload : Arc <[ u8 ]>, metadata : Option < Arc < str >>) -> TransportEnvelope {
// Zero-copy publish via Zenoh shared memory
let full_key = format! ( "{}{}" , self . config . key_prefix, key );
self . session . put ( & full_key , payload . as_ref ()) . wait () . ok ();
TransportEnvelope {
key : Arc :: from ( key ),
payload ,
metadata_json : metadata ,
sequence : self . next_sequence (),
timestamp_ns : self . now_ns (),
}
}
}
Hardware Abstraction
The Go2 hardware interface (rfx/crates/rfx-core/src/hardware/go2/mod.rs) abstracts DDS backends:
pub trait DdsBackend {
fn publish_low_cmd ( & self , cmd : & LowCmd ) -> Result <()>;
fn publish_sport_cmd ( & self , cmd : & SportModeCmd ) -> Result <()>;
fn subscribe_state ( & self ) -> Receiver < LowState >;
fn is_connected ( & self ) -> bool ;
fn disconnect ( & self );
}
pub enum Go2BackendHint {
Zenoh , // Zenoh transport via zenoh-bridge-dds
CycloneDds , // Native CycloneDDS backend
DustDds , // Pure Rust dust-dds backend
}
pub struct Go2 {
config : Go2Config ,
state : Arc < RwLock < Go2State >>,
backend : Arc < dyn DdsBackend >,
connected : Arc < AtomicBool >,
}
Zenoh is the recommended backend for distributed robotics. It provides zero-copy shared memory, automatic discovery, and peer-to-peer routing.
Type Conversions
PyO3 automatically converts between Python and Rust types:
Python Type Rust Type floatf64inti64, u64, usizeboolboolstrString, &strbytesVec<u8>, &[u8]listVec<T>dictHashMap<K, V>tuple(T1, T2, ...)
Custom types use #[pyclass] for automatic conversion:
import _rfx
# Create Rust type from Python
quat = _rfx.Quaternion.from_euler( 0.1 , 0.2 , 0.3 )
print (quat.w, quat.x, quat.y, quat.z) # Direct attribute access
# Call Rust methods
rotated = quat.rotate_vector([ 1.0 , 0.0 , 0.0 ])
print (rotated) # [0.95, 0.31, 0.05]
Zero-Copy Where Possible Use Arc<[u8]> for payloads to avoid copying between Python and Rust.
Release GIL Eagerly Always use py.detach() for I/O or CPU-intensive operations.
Prefer Rust for Hot Loops Implement control loops in Rust; keep Python for high-level orchestration.
Use Lock-Free Structures crossbeam-channel and parking_lot minimize contention.
Error Handling
Rust errors are converted to Python exceptions:
#[derive( Debug , thiserror :: Error )]
pub enum Error {
#[error( "Connection error: {0}" )]
Connection ( String ),
#[error( "Hardware error: {0}" )]
Hardware ( String ),
#[error( "Timeout: {0}" )]
Timeout ( String ),
}
pub type Result < T > = std :: result :: Result < T , Error >;
// In PyO3 bindings
fn connect ( config : PyGo2Config ) -> PyResult < Self > {
rfx_core :: hardware :: go2 :: Go2 :: connect ( config . inner)
. map ( | go2 | Self { inner : Arc :: new ( go2 ) })
. map_err ( | e | PyRuntimeError :: new_err ( e . to_string ()))
}
Building from Source
The Rust extension is built via maturin:
# Development build (editable install)
maturin develop --manifest-path rfx/crates/rfx-python/Cargo.toml
# Release build
maturin build --release --manifest-path rfx/crates/rfx-python/Cargo.toml
# Install wheel
pip install target/wheels/ * .whl
maturin develop creates an editable install for rapid iteration. Changes to Rust code require rebuilding, but Python changes are immediate.
Testing
Rust tests run with cargo test:
# Test rfx-core
cargo test --package rfx-core
# Test with specific features
cargo test --package rfx-core --features zenoh
# Run benchmarks
cargo bench --package rfx-core
Python integration tests use pytest:
Dependency Graph
rfx-python (PyO3 bindings)
├── rfx-core (Rust library)
│ ├── nalgebra (linear algebra)
│ ├── tokio (async runtime)
│ ├── zenoh (transport)
│ ├── crossbeam-channel (lock-free channels)
│ └── parking_lot (fast mutexes)
└── pyo3 (Python bindings)
Best Practices
When to Use Rust vs. Python
Use Rust for:
Real-time control loops (< 1ms latency)
Hardware communication (DDS, serial, Zenoh)
Performance-critical math (kinematics, filtering)
Lock-free concurrency
Use Python for:
High-level orchestration and scripting
Neural network training (tinygrad)
LLM agent integration (skills)
Rapid prototyping
Always release GIL for blocking I/O (py.detach())
Minimize Python calls inside hot loops
Use Rust channels to pass data between threads
Avoid acquiring GIL in Rust callbacks if possible
Use Arc for shared ownership across threads
Prefer &[u8] over Vec<u8> for read-only data
Use parking_lot::RwLock for shared mutable state
Avoid cloning large structures; use references
See Also