Skip to main content
The control loop connects a policy to a robot with rate control, jitter tracking, error handling, and clean shutdown.

Quick Start

The simplest way to run a policy:
import rfx

@rfx.policy
def my_policy(obs):
    return obs["state"]  # Mirror current state

robot = rfx.MockRobot(state_dim=12, action_dim=6)
stats = rfx.run(robot, my_policy, rate_hz=50, duration=10.0)

print(f"Completed {stats.iterations} steps")
print(f"Average period: {stats.avg_period_s * 1000:.2f} ms")
Source: rfx/python/rfx/session.py:258-279

The run() Function

One-liner to connect policy to robot:
def run(
    robot: Robot,
    policy: Callable[[dict[str, torch.Tensor]], torch.Tensor],
    rate_hz: float = 50,
    duration: float | None = None,
    warmup_s: float = 0.5,
) -> SessionStats:
    """Run inference loop and return timing stats."""

Parameters

ParameterTypeDefaultDescription
robotRobotrequiredAny object with observe(), act(), reset()
policyCallablerequiredMaps observation dict to action tensor
rate_hzfloat50Target control loop frequency in Hz
durationfloat | NoneNoneRun time in seconds (None = infinite, Ctrl+C to stop)
warmup_sfloat0.5Seconds to sleep after reset before starting

Returns

SessionStats with timing and jitter information.

Session Class

For more control over the lifecycle:
import rfx

robot = rfx.MockRobot(state_dim=12, action_dim=6)

with rfx.Session(robot, my_policy, rate_hz=50) as session:
    session.run(duration=10.0)
    print(f"Steps: {session.step_count}")
    print(f"Stats: {session.stats}")
Source: rfx/python/rfx/session.py:61-77

Constructor

class Session:
    def __init__(
        self,
        robot: Robot,
        policy: Callable[[dict[str, torch.Tensor]], torch.Tensor],
        rate_hz: float = 50,
        warmup_s: float = 0.5,
    ):
        ...

Methods

session = rfx.Session(robot, policy, rate_hz=50)

# Start the control loop (spawns daemon thread)
session.start()

# Stop the control loop (joins thread)
session.stop()

# Blocking run (combines start + wait + stop)
session.run(duration=10.0)  # or None for infinite

# Check for errors from control thread
session.check_health()  # Raises if loop failed
Source: rfx/python/rfx/session.py:148-205

Properties

session = rfx.Session(robot, policy, rate_hz=50)
session.start()

# Current iteration count
print(session.step_count)  # e.g., 1542

# Is control loop running?
print(session.is_running)  # True

# Timing statistics
stats = session.stats
print(stats.avg_period_s)
print(stats.p95_jitter_s)
Source: rfx/python/rfx/session.py:104-141

SessionStats

Timing summary for jitter and overrun analysis:
@dataclass(frozen=True)
class SessionStats:
    iterations: int          # Total control loop iterations
    overruns: int           # Number of times loop exceeded target period
    target_period_s: float  # 1.0 / rate_hz
    avg_period_s: float     # Average actual period
    p50_jitter_s: float     # Median jitter
    p95_jitter_s: float     # 95th percentile jitter
    p99_jitter_s: float     # 99th percentile jitter
    max_jitter_s: float     # Maximum jitter observed
Source: rfx/python/rfx/session.py:32-55

Example Output

import rfx

robot = rfx.MockRobot(state_dim=12, action_dim=6)
stats = rfx.run(robot, my_policy, rate_hz=50, duration=5.0)

print(stats.iterations)       # 250
print(stats.overruns)         # 0
print(stats.target_period_s)  # 0.02 (50 Hz)
print(stats.avg_period_s)     # 0.0201
print(stats.p50_jitter_s)     # 0.0001
print(stats.p95_jitter_s)     # 0.0003
print(stats.p99_jitter_s)     # 0.0005
print(stats.max_jitter_s)     # 0.0008

Converting to Dict

stats = rfx.run(robot, policy, rate_hz=50, duration=10.0)
data = stats.to_dict()

import json
print(json.dumps(data, indent=2))
# {
#   "iterations": 500,
#   "overruns": 0,
#   "target_period_s": 0.02,
#   "avg_period_s": 0.0201,
#   "p50_jitter_s": 0.0001,
#   "p95_jitter_s": 0.0003,
#   "p99_jitter_s": 0.0005,
#   "max_jitter_s": 0.0008
# }
Source: rfx/python/rfx/session.py:45-55

Control Loop Internals

The control thread runs this loop:
while running:
    loop_start = time.perf_counter()
    
    # 1. Observe
    obs = robot.observe()
    
    # 2. Compute action (no gradients)
    with torch.no_grad():
        action = policy(obs)
    
    # 3. Execute
    robot.act(action)
    
    # 4. Sleep until next deadline
    sleep_until(next_deadline)
    next_deadline += target_period
Source: rfx/python/rfx/session.py:207-249

Hybrid Sleep Strategy

For precise timing, the loop uses hybrid sleep:
sleep_s = next_deadline - time.perf_counter()

if sleep_s > 0:
    # Coarse sleep for most of the wait
    spin_window_s = 0.0012  # ~1.2ms
    if sleep_s > spin_window_s:
        time.sleep(sleep_s - spin_window_s)
    
    # Busy spin for last ~1.2ms (more precise)
    while time.perf_counter() < next_deadline:
        pass
Source: rfx/python/rfx/session.py:238-245
This achieves sub-millisecond timing accuracy on most systems without requiring real-time OS patches.

Usage Patterns

Infinite Loop

import rfx

robot = rfx.RealRobot.from_config(rfx.SO101_CONFIG, port="/dev/ttyACM0")

# Run until Ctrl+C
stats = rfx.run(robot, my_policy, rate_hz=50, duration=None)
print(f"Stopped after {stats.iterations} steps")

Timed Run

import rfx

robot = rfx.MockRobot(state_dim=12, action_dim=6)

# Run for exactly 10 seconds
stats = rfx.run(robot, my_policy, rate_hz=50, duration=10.0)
print(f"Completed {stats.iterations} steps in 10 seconds")

Manual Control

import rfx
import time

robot = rfx.MockRobot(state_dim=12, action_dim=6)
session = rfx.Session(robot, my_policy, rate_hz=50)

# Start loop
session.start()

try:
    # Do other work while loop runs
    while session.step_count < 1000:
        time.sleep(0.1)
        session.check_health()  # Raise if loop failed
finally:
    session.stop()

print(f"Final stats: {session.stats}")

Error Handling

import rfx

robot = rfx.MockRobot(state_dim=12, action_dim=6)

try:
    stats = rfx.run(robot, my_policy, rate_hz=50, duration=10.0)
except KeyboardInterrupt:
    print("Stopped by user")
except RuntimeError as e:
    print(f"Control loop failed: {e}")
Source: rfx/python/rfx/session.py:143-146

Rust Control Loop

For ultra-low latency, use the Rust control loop:
import rfx

# Requires Rust extension to be built
if rfx.run_control_loop is not None:
    handle = rfx.run_control_loop(
        robot=robot,
        policy=policy,
        rate_hz=1000,  # Can run at kHz rates
    )
    
    # Loop runs in Rust thread
    handle.wait(duration=10.0)
    
    # Get stats
    stats = handle.stats()
    print(stats.avg_period_s)
else:
    print("Rust extension not available")
Source: rfx/python/rfx/__init__.py:22
The Rust control loop can achieve less than 100μs jitter on real-time kernels, ideal for high-frequency control (1kHz+).

Best Practices

Different robots have different optimal control rates:
# Arm robots: 20-50 Hz is typical
rfx.run(so101_robot, policy, rate_hz=50)

# Legged robots: 100-500 Hz for stable walking
rfx.run(go2_robot, policy, rate_hz=200)

# High-precision tasks: 100-200 Hz
rfx.run(robot, precise_policy, rate_hz=100)
Monitor stats.overruns to ensure your policy is fast enough.
Give the robot time to settle after reset:
# Default 0.5s warmup
stats = rfx.run(robot, policy, rate_hz=50, warmup_s=0.5)

# Longer warmup for heavy robots
stats = rfx.run(humanoid, policy, rate_hz=100, warmup_s=2.0)

# No warmup for simulation
stats = rfx.run(sim_robot, policy, rate_hz=50, warmup_s=0.0)
High jitter indicates an overloaded system:
stats = rfx.run(robot, policy, rate_hz=50, duration=10.0)

# Check jitter
if stats.p95_jitter_s > 0.005:  # > 5ms
    print("WARNING: High jitter detected")
    print(f"p95: {stats.p95_jitter_s * 1000:.2f} ms")
    print(f"p99: {stats.p99_jitter_s * 1000:.2f} ms")

# Check overruns
overrun_rate = stats.overruns / stats.iterations
if overrun_rate > 0.01:  # > 1%
    print(f"WARNING: {overrun_rate * 100:.1f}% overruns")
    print("Consider: lower rate_hz or optimize policy")
Ensure proper shutdown even on errors:
with rfx.Session(robot, policy, rate_hz=50) as session:
    session.run(duration=10.0)
    # Session.stop() called automatically on exit
Source: rfx/python/rfx/session.py:250-255

Example: Complete Deploy Script

#!/usr/bin/env python3
"""Deploy policy to robot with logging and error handling."""

import argparse
import torch
import rfx

@rfx.policy
def my_policy(obs):
    return obs["state"]  # Simple hold position

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mock", action="store_true")
    parser.add_argument("--rate", type=float, default=50)
    parser.add_argument("--duration", type=float, default=None)
    args = parser.parse_args()
    
    # Create robot
    if args.mock:
        robot = rfx.MockRobot(state_dim=12, action_dim=6)
    else:
        robot = rfx.RealRobot.from_config(
            rfx.SO101_CONFIG,
            port="/dev/ttyACM0"
        )
    
    print(f"Robot: {robot}")
    print(f"Running at {args.rate} Hz...")
    
    try:
        # Run control loop
        stats = rfx.run(
            robot,
            my_policy,
            rate_hz=args.rate,
            duration=args.duration
        )
        
        # Report results
        print(f"\nCompleted {stats.iterations} steps")
        print(f"Overruns: {stats.overruns}")
        print(f"Avg period: {stats.avg_period_s * 1000:.2f} ms")
        print(f"Jitter p50: {stats.p50_jitter_s * 1000:.2f} ms")
        print(f"Jitter p95: {stats.p95_jitter_s * 1000:.2f} ms")
        
    except KeyboardInterrupt:
        print("\nStopped by user")
    except Exception as e:
        print(f"\nError: {e}")
        raise

if __name__ == "__main__":
    main()
Source: rfx/examples/deploy_real.py:75-136

Robot Interface

Understand the Robot protocol

Policies

Write policies for the control loop

Observations

Work with observation dictionaries

Build docs developers (and LLMs) love