Skip to main content
The rfx.session module provides a Session class that connects a policy to a robot with lifecycle management, rate control, jitter tracking, error handling, and clean shutdown.

Session

Hardened inference-robot connection with lifecycle management. Connects a policy callable to a robot with rate control, jitter tracking, error handling, and clean shutdown. Thread-based, no async forced on users.

Constructor

Session(
    robot: Robot,
    policy: Callable[[dict[str, torch.Tensor]], torch.Tensor],
    rate_hz: float = 50,
    warmup_s: float = 0.5
)
robot
Robot
required
Any object satisfying the Robot protocol (observe/act/reset)
policy
Callable
required
Callable that maps an observation dict to an action tensor
rate_hz
float
default:"50"
Target control loop frequency in Hz
warmup_s
float
default:"0.5"
Seconds to sleep after reset before starting the loop

Properties

step_count
int
Current number of control loop iterations (thread-safe)
is_running
bool
Whether the control loop is currently running
stats
SessionStats
Loop timing statistics (thread-safe)

Methods

start()

Reset robot and spawn the daemon control thread.
session = Session(robot, policy, rate_hz=50)
session.start()
# Control loop now running in background

stop()

Signal stop and join the control thread. Idempotent - safe to call multiple times.
session.stop()  # Gracefully stop the control loop

run(duration=None)

Blocking run. None means infinite until Ctrl+C or stop().
duration
float | None
Maximum run time in seconds. None for infinite.
# Run for 10 seconds
session.run(duration=10.0)

# Run until Ctrl+C
session.run()  # Press Ctrl+C to stop

check_health()

Raise if the control thread encountered an error.
try:
    session.check_health()
except RuntimeError as e:
    print(f"Control loop failed: {e}")

Context Manager

Session can be used as a context manager for automatic cleanup:
with Session(robot, policy, rate_hz=50) as s:
    s.run(duration=10.0)
    print(s.stats)
# Automatically stopped on exit

SessionStats

Loop timing summary for jitter and overrun analysis. Immutable dataclass.

Fields

iterations
int
Total number of completed iterations
overruns
int
Number of timing overruns (iterations that exceeded target period)
target_period_s
float
Target loop period in seconds (1 / rate_hz)
avg_period_s
float
Average actual loop period in seconds
p50_jitter_s
float
Median jitter (50th percentile) in seconds
p95_jitter_s
float
95th percentile jitter in seconds
p99_jitter_s
float
99th percentile jitter in seconds
max_jitter_s
float
Maximum jitter observed in seconds

Methods

to_dict()

Convert stats to a dictionary.
returns
dict[str, float | int]
Dictionary with all stats fields
stats = session.stats
stats_dict = stats.to_dict()
print(f"Iterations: {stats_dict['iterations']}")
print(f"P95 jitter: {stats_dict['p95_jitter_s'] * 1000:.2f} ms")

run()

One-liner function to connect a policy to a robot and run inference.
run(
    robot: Robot,
    policy: Callable[[dict[str, torch.Tensor]], torch.Tensor],
    rate_hz: float = 50,
    duration: float | None = None,
    warmup_s: float = 0.5
) -> SessionStats
robot
Robot
required
Any object satisfying the Robot protocol
policy
Callable
required
Callable mapping observation dict to action tensor
rate_hz
float
default:"50"
Target control loop frequency in Hz
duration
float | None
default:"None"
Run time in seconds. None for infinite (Ctrl+C to stop)
warmup_s
float
default:"0.5"
Seconds to sleep after reset before starting the loop
returns
SessionStats
Timing and jitter statistics from the run
import rfx

# One-liner deployment
stats = rfx.run(robot, policy, rate_hz=50, duration=10.0)

print(f"Completed {stats.iterations} steps")
print(f"Overruns: {stats.overruns}")
print(f"Average loop time: {stats.avg_period_s * 1000:.2f} ms")
print(f"P99 jitter: {stats.p99_jitter_s * 1000:.2f} ms")

Example Usage

Basic Session

import rfx
from rfx.session import Session

# Load robot and policy
robot = rfx.robot.lerobot.so101()
policy = load_my_policy()  # Your policy here

# Create and run session
with Session(robot, policy, rate_hz=50) as session:
    session.run(duration=10.0)
    
    # Print statistics
    stats = session.stats
    print(f"Iterations: {stats.iterations}")
    print(f"Overruns: {stats.overruns}")
    print(f"P95 jitter: {stats.p95_jitter_s * 1000:.2f} ms")

Manual Control

import time
from rfx.session import Session

session = Session(robot, policy, rate_hz=50)
session.start()

# Run for a while
time.sleep(5.0)

# Check progress
print(f"Steps so far: {session.step_count}")

# Run more
time.sleep(5.0)

# Stop and get stats
session.stop()
stats = session.stats
print(f"Total iterations: {stats.iterations}")

One-Liner Deployment

import rfx

robot = rfx.robot.lerobot.so101()
policy = load_my_policy()

# Simple one-liner
stats = rfx.run(robot, policy, rate_hz=50, duration=30.0)

Continuous Monitoring

import time
from rfx.session import Session

with Session(robot, policy, rate_hz=50) as session:
    # Run for 60 seconds with live monitoring
    start = time.time()
    while time.time() - start < 60:
        time.sleep(1.0)
        stats = session.stats
        print(f"Steps: {stats.iterations}, "
              f"Hz: {1/stats.avg_period_s:.1f}, "
              f"Overruns: {stats.overruns}")

Performance Notes

  • The control loop uses a hybrid sleep strategy: coarse sleep + busy spin for the last ~1.2ms to minimize jitter
  • Jitter is calculated as |actual_period - target_period|
  • Overruns occur when loop execution time exceeds the target period
  • Statistics are tracked with a maximum of 10,000 samples (configurable via _MAX_TIMING_SAMPLES)
  • The control loop is thread-based and runs as a daemon thread
  • All property accesses are thread-safe

Build docs developers (and LLMs) love