A policy in rfx is any callable that maps observations to actions:
Dict[str, Tensor] → Tensor
Policies can be functions, classes, or neural networks. The @rfx.policy decorator marks them as deployable.
The @policy Decorator
Mark any function as an rfx policy so rfx deploy can discover it:
import rfx
import torch
@rfx.policy
def my_policy(obs):
return torch.zeros(1, 64) # (num_envs, max_action_dim)
Source: rfx/python/rfx/decorators.py:31-63
Usage Styles
@rfx.policy
def hold_position(obs):
return obs["state"] # Hold current position
Policy Types
Function-Based
import rfx
import torch
@rfx.policy
def zero_action(obs):
"""Stay still — useful for testing."""
return torch.zeros(1, 64)
@rfx.policy
def mirror_state(obs):
"""Copy state as action (for kinesthetic teaching)."""
return obs["state"]
Neural Network
import torch.nn as nn
import rfx
class SimpleVLA(nn.Module):
"""Minimal MLP: obs["state"] → action."""
def __init__(self, state_dim: int, action_dim: int):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim),
)
def forward(self, obs: dict) -> torch.Tensor:
return self.net(obs["state"])
# Mark the instance, not the class
policy = SimpleVLA(state_dim=64, action_dim=64)
policy._rfx_policy = True
Source: rfx/examples/deploy_real.py:58-72
With MotorCommands
Use named joints instead of raw tensor indices:
import rfx
@rfx.policy
def grip_and_lift(obs):
cmd = rfx.MotorCommands(
{
"gripper": 0.8, # Close gripper
"shoulder": -0.5, # Lift shoulder
"elbow": -0.3 # Bend elbow
},
config=rfx.SO101_CONFIG
)
return cmd.to_tensor() # (1, 64)
Source: rfx/python/rfx/decorators.py:8-13
MotorCommands API
Build actions from human-readable joint names:
class MotorCommands:
def __init__(
self,
positions: dict[str, float] | None = None,
*,
config: RobotConfig | None = None,
kp: float = 20.0,
kd: float = 0.5,
):
...
def to_tensor(self, batch_size: int = 1) -> torch.Tensor:
"""Convert to (batch_size, max_action_dim) padded tensor."""
...
def to_list(self) -> list[float]:
"""Convert to flat list of length action_dim."""
...
Source: rfx/python/rfx/decorators.py:66-131
Examples
import rfx
cmd = rfx.MotorCommands(
{"gripper": 0.8, "elbow": -0.3},
config=rfx.SO101_CONFIG
)
action = cmd.to_tensor() # (1, 64)
robot.act(action)
Error Handling
import rfx
try:
cmd = rfx.MotorCommands({"invalid_joint": 0.5}, config=rfx.SO101_CONFIG)
action = cmd.to_tensor()
except ValueError as e:
# Joint 'invalid_joint' not found in SO-101 config.
# Available: shoulder, elbow, wrist, gripper, ...
print(e)
Source: rfx/python/rfx/decorators.py:119-128
Policies receive a dictionary with these possible keys:
def policy(obs: dict[str, torch.Tensor]) -> torch.Tensor:
state = obs["state"] # Always present: (num_envs, max_state_dim)
images = obs.get("images") # Optional: (num_envs, num_cams, H, W, 3)
lang = obs.get("language") # Optional: (num_envs, seq_len)
# Your logic here
return action # (num_envs, max_action_dim)
The "state" key is always present. Image and language modalities are optional and depend on your robot configuration.
Action Output
Policies must return a tensor with one of these shapes:
# Single-step action
torch.Tensor # (num_envs, max_action_dim)
# Multi-step action (action chunking)
torch.Tensor # (num_envs, horizon, max_action_dim)
Single-Step Example
@rfx.policy
def simple_controller(obs):
# Process state
state = obs["state"] # (1, 64)
# Compute action
action = model(state) # (1, 64)
return action
Action Chunking Example
@rfx.policy
def chunked_controller(obs):
# Generate 10-step action sequence
actions = model(obs["state"]) # (1, 10, 64)
return actions # Robot will execute all 10 steps
Deployment
Once decorated, deploy policies with the CLI:
# Deploy to real robot
rfx deploy my_policy.py --robot so101 --port /dev/ttyACM0
# Test in mock mode
rfx deploy my_policy.py --mock
# Load checkpoint
rfx deploy my_policy.py --checkpoint model.pt --robot so101
The rfx deploy command automatically discovers functions marked with @rfx.policy in your file.
Loading Policies
Load policies from the rfx hub:
import rfx
# Load from hub
policy = rfx.load_policy("username/repo-name")
# Inspect before loading
info = rfx.inspect_policy("username/repo-name")
print(info.description)
print(info.robot_types)
# Use with robot
robot = rfx.MockRobot(state_dim=12, action_dim=6)
stats = rfx.run(robot, policy, rate_hz=50, duration=5.0)
Source: rfx/python/rfx/__init__.py:73
Best Practices
Use torch.no_grad() for inference
Disable gradient computation when running policies:import torch
with torch.no_grad():
action = policy(obs)
The Session and run() helpers do this automatically.Source: rfx/python/rfx/session.py:220-222
Ensure your policy outputs match the robot’s dimensions:@rfx.policy
def my_policy(obs):
action = model(obs["state"])
# Validate
expected = (obs["state"].shape[0], 64) # (num_envs, max_action_dim)
assert action.shape == expected, f"Shape mismatch: {action.shape} != {expected}"
return action
Test with MockRobot first
Always validate your policy in mock mode before deploying to hardware:import rfx
# Test policy
robot = rfx.MockRobot(state_dim=12, action_dim=6)
stats = rfx.run(robot, my_policy, rate_hz=50, duration=5.0)
print(f"Completed {stats.iterations} steps")
print(f"Average period: {stats.avg_period_s * 1000:.2f} ms")
Robot Interface
Understand the Robot protocol
Observations
Work with multi-modal observations
Control Loop
Run policies with Session