The rfx.deploy module provides a simple interface to deploy trained policies to robots. Load a model, connect to hardware, and run inference with a single function call.
deploy()
Deploy a trained policy to a robot. One function, zero ceremony.
deploy(
policy_source: str | Path,
*,
robot: str | None = None,
config: str | Path | None = None,
port: str | None = None,
rate_hz: float | None = None,
duration: float | None = None,
mock: bool = False,
device: str = "cpu",
warmup_s: float = 0.5,
verbose: bool = True
) -> SessionStats
Parameters
Path to saved policy directory, or "hf://org/repo" for HuggingFace models, or path to a .py file with @rfx.policy decorated function
Robot type: "so101", "go2", "g1", or "innate". Auto-detected from policy config if saved. Can also be a path to a YAML config file.
config
str | Path | None
default:"None"
Path to robot YAML config file. Overrides robot parameter if provided.
Serial port or IP address override (e.g., "/dev/ttyACM0", "192.168.123.161")
rate_hz
float | None
default:"None"
Control loop frequency in Hz. Defaults to robot config’s control_freq_hz.
duration
float | None
default:"None"
Run time in seconds. None means infinite (press Ctrl+C to stop).
If True, use MockRobot instead of real hardware for testing.
Torch device for policy inference ("cpu" or "cuda")
Seconds to sleep after reset before starting the control loop
Print status messages and statistics during deployment
Returns
Timing and jitter statistics from the deployment session
Raises
ValueError: If robot type cannot be determined or is invalid
RuntimeError: If connection to robot fails
FileNotFoundError: If policy source or config file not found
Usage Examples
Deploy from Local Directory
import rfx
# Deploy with explicit robot type
stats = rfx.deploy("runs/my-policy", robot="so101")
# Deploy with custom config
stats = rfx.deploy("runs/my-policy", config="configs/custom_so101.yaml")
# Deploy with custom port
stats = rfx.deploy(
"runs/my-policy",
robot="so101",
port="/dev/ttyACM1"
)
Deploy from HuggingFace
import rfx
# Deploy from HuggingFace Hub
stats = rfx.deploy(
"hf://user/my-policy",
robot="go2",
duration=60.0
)
# Deploy with custom control frequency
stats = rfx.deploy(
"hf://org/policy-v2",
robot="g1",
rate_hz=100,
device="cuda"
)
Deploy Python Policy Function
import rfx
# Deploy from a .py file with @rfx.policy decorator
stats = rfx.deploy(
"my_policy.py",
robot="so101",
duration=30.0
)
Example my_policy.py:
import torch
import rfx
@rfx.policy
def my_policy(obs):
# Your policy implementation
state = obs["state"]
action = torch.zeros_like(state)
return action
Test with Mock Robot
import rfx
# Test deployment without hardware
stats = rfx.deploy(
"runs/my-policy",
robot="so101",
mock=True, # Use mock robot
duration=10.0
)
print(f"Mock run completed {stats.iterations} steps")
Deploy with Custom Settings
import rfx
stats = rfx.deploy(
"runs/trained-policy",
robot="go2",
port="192.168.1.100", # Custom IP
rate_hz=200, # High-frequency control
duration=120.0, # 2 minutes
device="cuda", # GPU inference
warmup_s=1.0, # Longer warmup
verbose=True
)
# Analyze results
print(f"Completed {stats.iterations} iterations")
print(f"Control rate: {1/stats.avg_period_s:.1f} Hz")
print(f"Overruns: {stats.overruns}")
print(f"P99 jitter: {stats.p99_jitter_s * 1000:.2f} ms")
Deploy Until Interrupted
import rfx
# Run indefinitely until Ctrl+C
stats = rfx.deploy(
"hf://user/my-policy",
robot="so101",
duration=None # Infinite
)
# Press Ctrl+C to stop
print(f"Stopped after {stats.iterations} steps")
CLI Usage
The deploy function can also be used from the command line:
# Basic deployment
rfx deploy runs/my-policy --robot so101
# Deploy from HuggingFace
rfx deploy hf://user/my-policy --robot go2 --duration 60
# Deploy Python file
rfx deploy my_policy.py --robot so101
# Test with mock robot
rfx deploy runs/my-policy --mock
# Custom settings
rfx deploy runs/policy \
--robot go2 \
--port 192.168.123.161 \
--rate-hz 200 \
--duration 120 \
--device cuda
Robot Auto-Detection
The deploy() function tries to determine the robot type in this order:
- Explicit config file (
--config parameter)
- Explicit robot type (
--robot parameter)
- Bundled robot config (saved with the policy)
- Policy config metadata (
rfx_config.json)
If none of these provide robot information, deployment will fail with a helpful error message.
# These are equivalent if policy was saved with robot config:
stats = rfx.deploy("runs/my-policy") # Auto-detect
stats = rfx.deploy("runs/my-policy", robot="so101") # Explicit
Built-in Robot Types
The following robot types are built-in:
"so101" - SO-101 6-DOF arm
"go2" - Unitree Go2 quadruped
"g1" - Unitree G1 humanoid
"innate" - Innate manipulation arm
# Deploy to different robot types
stats = rfx.deploy("policy", robot="so101")
stats = rfx.deploy("policy", robot="go2")
stats = rfx.deploy("policy", robot="g1")
Custom Robot Configs
You can provide a custom YAML config file:
stats = rfx.deploy(
"runs/my-policy",
config="configs/custom_robot.yaml"
)
Example YAML config:
name: "CustomRobot"
state_dim: 12
action_dim: 6
max_state_dim: 64
max_action_dim: 64
control_freq_hz: 50
hardware:
port: "/dev/ttyACM0"
baudrate: 1000000
Output and Monitoring
When verbose=True (default), deploy prints:
- Loading phase: Policy type and bundled config info
- Robot info: Robot instance and run parameters
- Live progress: Steps, Hz, and remaining time (for timed runs)
- Final statistics: Iterations, overruns, and jitter metrics
Example output:
[rfx] Loading policy from runs/my-policy...
[rfx] Policy type: lerobot
[rfx] Bundled robot config: SO-101
[rfx] Robot: SO101Robot(num_envs=1, state_dim=12, action_dim=6, device='cpu')
[rfx] Running at 50 Hz for 10.0s
[rfx] 500 steps | 50 Hz | 0s remaining
[rfx] Done - 500 steps, 0 overruns
[rfx] avg period: 20.01 ms (target: 20.00 ms)
[rfx] jitter p50: 0.05 ms
[rfx] jitter p95: 0.15 ms
[rfx] jitter p99: 0.25 ms
Error Handling
import rfx
try:
stats = rfx.deploy(
"runs/my-policy",
robot="so101",
duration=10.0
)
except ValueError as e:
print(f"Configuration error: {e}")
except RuntimeError as e:
print(f"Runtime error: {e}")
except KeyboardInterrupt:
print("Deployment interrupted by user")