The control loop connects a policy to a robot with rate control, jitter tracking, error handling, and clean shutdown.
Quick Start
The simplest way to run a policy:
import rfx
@rfx.policy
def my_policy ( obs ):
return obs[ "state" ] # Mirror current state
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
stats = rfx.run(robot, my_policy, rate_hz = 50 , duration = 10.0 )
print ( f "Completed { stats.iterations } steps" )
print ( f "Average period: { stats.avg_period_s * 1000 :.2f} ms" )
Source: rfx/python/rfx/session.py:258-279
The run() Function
One-liner to connect policy to robot:
def run (
robot : Robot,
policy : Callable[[dict[ str , torch.Tensor]], torch.Tensor],
rate_hz : float = 50 ,
duration : float | None = None ,
warmup_s : float = 0.5 ,
) -> SessionStats:
"""Run inference loop and return timing stats."""
Parameters
Parameter Type Default Description robotRobotrequired Any object with observe(), act(), reset() policyCallablerequired Maps observation dict to action tensor rate_hzfloat50Target control loop frequency in Hz durationfloat | NoneNoneRun time in seconds (None = infinite, Ctrl+C to stop) warmup_sfloat0.5Seconds to sleep after reset before starting
Returns
SessionStats with timing and jitter information.
Session Class
For more control over the lifecycle:
import rfx
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
with rfx.Session(robot, my_policy, rate_hz = 50 ) as session:
session.run( duration = 10.0 )
print ( f "Steps: { session.step_count } " )
print ( f "Stats: { session.stats } " )
Source: rfx/python/rfx/session.py:61-77
Constructor
class Session :
def __init__ (
self ,
robot : Robot,
policy : Callable[[dict[ str , torch.Tensor]], torch.Tensor],
rate_hz : float = 50 ,
warmup_s : float = 0.5 ,
):
...
Methods
session = rfx.Session(robot, policy, rate_hz = 50 )
# Start the control loop (spawns daemon thread)
session.start()
# Stop the control loop (joins thread)
session.stop()
# Blocking run (combines start + wait + stop)
session.run( duration = 10.0 ) # or None for infinite
# Check for errors from control thread
session.check_health() # Raises if loop failed
Source: rfx/python/rfx/session.py:148-205
Properties
session = rfx.Session(robot, policy, rate_hz = 50 )
session.start()
# Current iteration count
print (session.step_count) # e.g., 1542
# Is control loop running?
print (session.is_running) # True
# Timing statistics
stats = session.stats
print (stats.avg_period_s)
print (stats.p95_jitter_s)
Source: rfx/python/rfx/session.py:104-141
SessionStats
Timing summary for jitter and overrun analysis:
@dataclass ( frozen = True )
class SessionStats :
iterations: int # Total control loop iterations
overruns: int # Number of times loop exceeded target period
target_period_s: float # 1.0 / rate_hz
avg_period_s: float # Average actual period
p50_jitter_s: float # Median jitter
p95_jitter_s: float # 95th percentile jitter
p99_jitter_s: float # 99th percentile jitter
max_jitter_s: float # Maximum jitter observed
Source: rfx/python/rfx/session.py:32-55
Example Output
import rfx
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
stats = rfx.run(robot, my_policy, rate_hz = 50 , duration = 5.0 )
print (stats.iterations) # 250
print (stats.overruns) # 0
print (stats.target_period_s) # 0.02 (50 Hz)
print (stats.avg_period_s) # 0.0201
print (stats.p50_jitter_s) # 0.0001
print (stats.p95_jitter_s) # 0.0003
print (stats.p99_jitter_s) # 0.0005
print (stats.max_jitter_s) # 0.0008
Converting to Dict
stats = rfx.run(robot, policy, rate_hz = 50 , duration = 10.0 )
data = stats.to_dict()
import json
print (json.dumps(data, indent = 2 ))
# {
# "iterations": 500,
# "overruns": 0,
# "target_period_s": 0.02,
# "avg_period_s": 0.0201,
# "p50_jitter_s": 0.0001,
# "p95_jitter_s": 0.0003,
# "p99_jitter_s": 0.0005,
# "max_jitter_s": 0.0008
# }
Source: rfx/python/rfx/session.py:45-55
Control Loop Internals
The control thread runs this loop:
while running:
loop_start = time.perf_counter()
# 1. Observe
obs = robot.observe()
# 2. Compute action (no gradients)
with torch.no_grad():
action = policy(obs)
# 3. Execute
robot.act(action)
# 4. Sleep until next deadline
sleep_until(next_deadline)
next_deadline += target_period
Source: rfx/python/rfx/session.py:207-249
Hybrid Sleep Strategy
For precise timing, the loop uses hybrid sleep:
sleep_s = next_deadline - time.perf_counter()
if sleep_s > 0 :
# Coarse sleep for most of the wait
spin_window_s = 0.0012 # ~1.2ms
if sleep_s > spin_window_s:
time.sleep(sleep_s - spin_window_s)
# Busy spin for last ~1.2ms (more precise)
while time.perf_counter() < next_deadline:
pass
Source: rfx/python/rfx/session.py:238-245
This achieves sub-millisecond timing accuracy on most systems without requiring real-time OS patches.
Usage Patterns
Infinite Loop
import rfx
robot = rfx.RealRobot.from_config(rfx. SO101_CONFIG , port = "/dev/ttyACM0" )
# Run until Ctrl+C
stats = rfx.run(robot, my_policy, rate_hz = 50 , duration = None )
print ( f "Stopped after { stats.iterations } steps" )
Timed Run
import rfx
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
# Run for exactly 10 seconds
stats = rfx.run(robot, my_policy, rate_hz = 50 , duration = 10.0 )
print ( f "Completed { stats.iterations } steps in 10 seconds" )
Manual Control
import rfx
import time
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
session = rfx.Session(robot, my_policy, rate_hz = 50 )
# Start loop
session.start()
try :
# Do other work while loop runs
while session.step_count < 1000 :
time.sleep( 0.1 )
session.check_health() # Raise if loop failed
finally :
session.stop()
print ( f "Final stats: { session.stats } " )
Error Handling
import rfx
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
try :
stats = rfx.run(robot, my_policy, rate_hz = 50 , duration = 10.0 )
except KeyboardInterrupt :
print ( "Stopped by user" )
except RuntimeError as e:
print ( f "Control loop failed: { e } " )
Source: rfx/python/rfx/session.py:143-146
Rust Control Loop
For ultra-low latency, use the Rust control loop:
import rfx
# Requires Rust extension to be built
if rfx.run_control_loop is not None :
handle = rfx.run_control_loop(
robot = robot,
policy = policy,
rate_hz = 1000 , # Can run at kHz rates
)
# Loop runs in Rust thread
handle.wait( duration = 10.0 )
# Get stats
stats = handle.stats()
print (stats.avg_period_s)
else :
print ( "Rust extension not available" )
Source: rfx/python/rfx/__init__.py:22
The Rust control loop can achieve less than 100μs jitter on real-time kernels, ideal for high-frequency control (1kHz+).
Best Practices
Different robots have different optimal control rates: # Arm robots: 20-50 Hz is typical
rfx.run(so101_robot, policy, rate_hz = 50 )
# Legged robots: 100-500 Hz for stable walking
rfx.run(go2_robot, policy, rate_hz = 200 )
# High-precision tasks: 100-200 Hz
rfx.run(robot, precise_policy, rate_hz = 100 )
Monitor stats.overruns to ensure your policy is fast enough.
Use warmup_s for robot initialization
Give the robot time to settle after reset: # Default 0.5s warmup
stats = rfx.run(robot, policy, rate_hz = 50 , warmup_s = 0.5 )
# Longer warmup for heavy robots
stats = rfx.run(humanoid, policy, rate_hz = 100 , warmup_s = 2.0 )
# No warmup for simulation
stats = rfx.run(sim_robot, policy, rate_hz = 50 , warmup_s = 0.0 )
Monitor jitter for stability
High jitter indicates an overloaded system: stats = rfx.run(robot, policy, rate_hz = 50 , duration = 10.0 )
# Check jitter
if stats.p95_jitter_s > 0.005 : # > 5ms
print ( "WARNING: High jitter detected" )
print ( f "p95: { stats.p95_jitter_s * 1000 :.2f} ms" )
print ( f "p99: { stats.p99_jitter_s * 1000 :.2f} ms" )
# Check overruns
overrun_rate = stats.overruns / stats.iterations
if overrun_rate > 0.01 : # > 1%
print ( f "WARNING: { overrun_rate * 100 :.1f} % overruns" )
print ( "Consider: lower rate_hz or optimize policy" )
Use context manager for cleanup
Ensure proper shutdown even on errors: with rfx.Session(robot, policy, rate_hz = 50 ) as session:
session.run( duration = 10.0 )
# Session.stop() called automatically on exit
Source: rfx/python/rfx/session.py:250-255
Example: Complete Deploy Script
#!/usr/bin/env python3
"""Deploy policy to robot with logging and error handling."""
import argparse
import torch
import rfx
@rfx.policy
def my_policy ( obs ):
return obs[ "state" ] # Simple hold position
def main ():
parser = argparse.ArgumentParser()
parser.add_argument( "--mock" , action = "store_true" )
parser.add_argument( "--rate" , type = float , default = 50 )
parser.add_argument( "--duration" , type = float , default = None )
args = parser.parse_args()
# Create robot
if args.mock:
robot = rfx.MockRobot( state_dim = 12 , action_dim = 6 )
else :
robot = rfx.RealRobot.from_config(
rfx. SO101_CONFIG ,
port = "/dev/ttyACM0"
)
print ( f "Robot: { robot } " )
print ( f "Running at { args.rate } Hz..." )
try :
# Run control loop
stats = rfx.run(
robot,
my_policy,
rate_hz = args.rate,
duration = args.duration
)
# Report results
print ( f " \n Completed { stats.iterations } steps" )
print ( f "Overruns: { stats.overruns } " )
print ( f "Avg period: { stats.avg_period_s * 1000 :.2f} ms" )
print ( f "Jitter p50: { stats.p50_jitter_s * 1000 :.2f} ms" )
print ( f "Jitter p95: { stats.p95_jitter_s * 1000 :.2f} ms" )
except KeyboardInterrupt :
print ( " \n Stopped by user" )
except Exception as e:
print ( f " \n Error: { e } " )
raise
if __name__ == "__main__" :
main()
Source: rfx/examples/deploy_real.py:75-136
Robot Interface Understand the Robot protocol
Policies Write policies for the control loop
Observations Work with observation dictionaries