Skip to main content
A policy in rfx is any callable that maps observations to actions:
Dict[str, Tensor] → Tensor
Policies can be functions, classes, or neural networks. The @rfx.policy decorator marks them as deployable.

The @policy Decorator

Mark any function as an rfx policy so rfx deploy can discover it:
import rfx
import torch

@rfx.policy
def my_policy(obs):
    return torch.zeros(1, 64)  # (num_envs, max_action_dim)
Source: rfx/python/rfx/decorators.py:31-63

Usage Styles

@rfx.policy
def hold_position(obs):
    return obs["state"]  # Hold current position

Policy Types

Function-Based

import rfx
import torch

@rfx.policy
def zero_action(obs):
    """Stay still — useful for testing."""
    return torch.zeros(1, 64)

@rfx.policy
def mirror_state(obs):
    """Copy state as action (for kinesthetic teaching)."""
    return obs["state"]

Neural Network

import torch.nn as nn
import rfx

class SimpleVLA(nn.Module):
    """Minimal MLP: obs["state"] → action."""
    
    def __init__(self, state_dim: int, action_dim: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
        )
    
    def forward(self, obs: dict) -> torch.Tensor:
        return self.net(obs["state"])

# Mark the instance, not the class
policy = SimpleVLA(state_dim=64, action_dim=64)
policy._rfx_policy = True
Source: rfx/examples/deploy_real.py:58-72

With MotorCommands

Use named joints instead of raw tensor indices:
import rfx

@rfx.policy
def grip_and_lift(obs):
    cmd = rfx.MotorCommands(
        {
            "gripper": 0.8,      # Close gripper
            "shoulder": -0.5,    # Lift shoulder
            "elbow": -0.3        # Bend elbow
        },
        config=rfx.SO101_CONFIG
    )
    return cmd.to_tensor()  # (1, 64)
Source: rfx/python/rfx/decorators.py:8-13

MotorCommands API

Build actions from human-readable joint names:
class MotorCommands:
    def __init__(
        self,
        positions: dict[str, float] | None = None,
        *,
        config: RobotConfig | None = None,
        kp: float = 20.0,
        kd: float = 0.5,
    ):
        ...
    
    def to_tensor(self, batch_size: int = 1) -> torch.Tensor:
        """Convert to (batch_size, max_action_dim) padded tensor."""
        ...
    
    def to_list(self) -> list[float]:
        """Convert to flat list of length action_dim."""
        ...
Source: rfx/python/rfx/decorators.py:66-131

Examples

import rfx

cmd = rfx.MotorCommands(
    {"gripper": 0.8, "elbow": -0.3},
    config=rfx.SO101_CONFIG
)

action = cmd.to_tensor()  # (1, 64)
robot.act(action)

Error Handling

import rfx

try:
    cmd = rfx.MotorCommands({"invalid_joint": 0.5}, config=rfx.SO101_CONFIG)
    action = cmd.to_tensor()
except ValueError as e:
    # Joint 'invalid_joint' not found in SO-101 config.
    # Available: shoulder, elbow, wrist, gripper, ...
    print(e)
Source: rfx/python/rfx/decorators.py:119-128

Observation Input

Policies receive a dictionary with these possible keys:
def policy(obs: dict[str, torch.Tensor]) -> torch.Tensor:
    state = obs["state"]        # Always present: (num_envs, max_state_dim)
    images = obs.get("images")  # Optional: (num_envs, num_cams, H, W, 3)
    lang = obs.get("language")  # Optional: (num_envs, seq_len)
    
    # Your logic here
    return action  # (num_envs, max_action_dim)
The "state" key is always present. Image and language modalities are optional and depend on your robot configuration.

Action Output

Policies must return a tensor with one of these shapes:
# Single-step action
torch.Tensor  # (num_envs, max_action_dim)

# Multi-step action (action chunking)
torch.Tensor  # (num_envs, horizon, max_action_dim)

Single-Step Example

@rfx.policy
def simple_controller(obs):
    # Process state
    state = obs["state"]  # (1, 64)
    
    # Compute action
    action = model(state)  # (1, 64)
    
    return action

Action Chunking Example

@rfx.policy
def chunked_controller(obs):
    # Generate 10-step action sequence
    actions = model(obs["state"])  # (1, 10, 64)
    
    return actions  # Robot will execute all 10 steps

Deployment

Once decorated, deploy policies with the CLI:
# Deploy to real robot
rfx deploy my_policy.py --robot so101 --port /dev/ttyACM0

# Test in mock mode
rfx deploy my_policy.py --mock

# Load checkpoint
rfx deploy my_policy.py --checkpoint model.pt --robot so101
The rfx deploy command automatically discovers functions marked with @rfx.policy in your file.

Loading Policies

Load policies from the rfx hub:
import rfx

# Load from hub
policy = rfx.load_policy("username/repo-name")

# Inspect before loading
info = rfx.inspect_policy("username/repo-name")
print(info.description)
print(info.robot_types)

# Use with robot
robot = rfx.MockRobot(state_dim=12, action_dim=6)
stats = rfx.run(robot, policy, rate_hz=50, duration=5.0)
Source: rfx/python/rfx/__init__.py:73

Best Practices

Disable gradient computation when running policies:
import torch

with torch.no_grad():
    action = policy(obs)
The Session and run() helpers do this automatically.Source: rfx/python/rfx/session.py:220-222
Ensure your policy outputs match the robot’s dimensions:
@rfx.policy
def my_policy(obs):
    action = model(obs["state"])
    
    # Validate
    expected = (obs["state"].shape[0], 64)  # (num_envs, max_action_dim)
    assert action.shape == expected, f"Shape mismatch: {action.shape} != {expected}"
    
    return action
Always validate your policy in mock mode before deploying to hardware:
import rfx

# Test policy
robot = rfx.MockRobot(state_dim=12, action_dim=6)
stats = rfx.run(robot, my_policy, rate_hz=50, duration=5.0)

print(f"Completed {stats.iterations} steps")
print(f"Average period: {stats.avg_period_s * 1000:.2f} ms")

Robot Interface

Understand the Robot protocol

Observations

Work with multi-modal observations

Control Loop

Run policies with Session

Build docs developers (and LLMs) love