Skip to main content
Observations in rfx are dictionaries of tensors returned by robot.observe(). They support state, images, and language — all properly batched and padded.

Observation Dictionary

The standard format:
obs: dict[str, torch.Tensor] = {
    "state": torch.Tensor,     # (num_envs, max_state_dim) - Always present
    "images": torch.Tensor,    # (num_envs, num_cams, H, W, 3) - Optional
    "language": torch.Tensor   # (num_envs, seq_len) - Optional
}

State Tensor

Contains joint positions, velocities, and other proprioceptive data:
robot = rfx.MockRobot(state_dim=12, action_dim=6)
obs = robot.observe()

print(obs["state"].shape)  # torch.Size([1, 64])
print(robot.state_dim)     # 12 (actual DOF)
print(robot.max_state_dim) # 64 (padded for multi-embodiment)
The first state_dim elements contain real data; the rest are zero-padding.

Image Tensor

Optional RGB images from cameras:
obs = robot.observe()
if "images" in obs:
    images = obs["images"]  # (num_envs, num_cams, H, W, 3)
    print(images.shape)     # e.g., torch.Size([1, 2, 480, 640, 3])

Language Tensor

Optional tokenized instructions:
obs = robot.observe()
if "language" in obs:
    tokens = obs["language"]  # (num_envs, seq_len)
    print(tokens.shape)       # e.g., torch.Size([1, 77])

ObservationSpec

Defines the structure of observations:
from rfx import ObservationSpec

spec = ObservationSpec(
    state_dim=12,
    max_state_dim=64,
    image_shape=(480, 640, 3),
    num_cameras=2,
    language_dim=77
)

print(spec.has_images)    # True
print(spec.has_language)  # True
Source: rfx/python/rfx/observation.py:14-30

Properties

spec = ObservationSpec(
    state_dim=6,
    max_state_dim=64,
    num_cameras=0
)

spec.has_images    # False (num_cameras == 0)
spec.has_language  # False (language_dim is None)

Creating Observations

The make_observation() helper creates properly formatted dictionaries:
from rfx import make_observation
import torch

# State only
state = torch.randn(1, 12)  # (batch, state_dim)
obs = make_observation(
    state=state,
    state_dim=12,
    max_state_dim=64
)
print(obs["state"].shape)  # torch.Size([1, 64]) - padded!

# With images
images = torch.randint(0, 255, (1, 2, 480, 640, 3))  # (batch, cams, H, W, C)
obs = make_observation(
    state=state,
    state_dim=12,
    max_state_dim=64,
    images=images
)
print(obs["images"].shape)  # torch.Size([1, 2, 480, 640, 3])

# With language
language = torch.randint(0, 50000, (1, 77))  # (batch, seq_len)
obs = make_observation(
    state=state,
    state_dim=12,
    max_state_dim=64,
    images=images,
    language=language,
    device="cuda"
)
Source: rfx/python/rfx/observation.py:33-61

Automatic Padding

make_observation() handles padding automatically:
import torch
from rfx import make_observation

# State smaller than max_state_dim
state = torch.ones(1, 6)  # Only 6 elements

obs = make_observation(
    state=state,
    state_dim=6,
    max_state_dim=64
)

print(obs["state"].shape)    # torch.Size([1, 64])
print(obs["state"][0, :6])   # tensor([1., 1., 1., 1., 1., 1.])
print(obs["state"][0, 6:])   # tensor([0., 0., ..., 0.]) - zero padding
Source: rfx/python/rfx/observation.py:46-52

Unpadding Actions

When interfacing with hardware that expects unpadded actions:
from rfx import unpad_action
import torch

# Policy outputs padded action
action_padded = torch.randn(1, 64)  # (num_envs, max_action_dim)

# Extract actual action
action_actual = unpad_action(action_padded, action_dim=6)
print(action_actual.shape)  # torch.Size([1, 6])
Source: rfx/python/rfx/observation.py:64-71

Multi-Step Actions

unpad_action() handles action chunking:
import torch
from rfx import unpad_action

# Single-step
action = torch.randn(8, 64)  # (batch, max_action_dim)
unpadded = unpad_action(action, action_dim=6)
print(unpadded.shape)  # torch.Size([8, 6])

# Multi-step (action chunking)
action = torch.randn(8, 10, 64)  # (batch, horizon, max_action_dim)
unpadded = unpad_action(action, action_dim=6)
print(unpadded.shape)  # torch.Size([8, 10, 6])
Source: rfx/python/rfx/observation.py:64-71

ObservationBuffer

Buffer observations for frame stacking:
from rfx.observation import ObservationBuffer
import torch

buffer = ObservationBuffer(capacity=4)

# Push observations
for _ in range(4):
    obs = robot.observe()
    buffer.push(obs)

# Get stacked observations
stacked = buffer.get_stacked()
print(stacked["state"].shape)  # (num_envs, stack_size, max_state_dim)
Source: rfx/python/rfx/observation.py:74-103

Buffer Operations

from rfx.observation import ObservationBuffer

buffer = ObservationBuffer(capacity=3)

# Check size
print(len(buffer))  # 0

# Add observations
buffer.push(obs1)
buffer.push(obs2)
print(len(buffer))  # 2

# Clear
buffer.clear()
print(len(buffer))  # 0

Frame Stacking Example

import rfx
from rfx.observation import ObservationBuffer
import torch.nn as nn

class TemporalPolicy(nn.Module):
    def __init__(self, stack_size=4):
        super().__init__()
        self.buffer = ObservationBuffer(capacity=stack_size)
        self.net = nn.LSTM(64, 256, batch_first=True)
        self.head = nn.Linear(256, 64)
    
    def forward(self, obs):
        # Add to buffer
        self.buffer.push(obs)
        
        if len(self.buffer) < self.buffer.capacity:
            # Not enough history yet
            return torch.zeros(1, 64)
        
        # Get stacked observations
        stacked = self.buffer.get_stacked()
        states = stacked["state"]  # (1, 4, 64)
        
        # Process with LSTM
        out, _ = self.net(states)
        action = self.head(out[:, -1, :])  # Use last timestep
        
        return action

Multi-Modal Policies

Handle images and language alongside state:
import torch
import torch.nn as nn
import rfx

class MultiModalPolicy(nn.Module):
    def __init__(self):
        super().__init__()
        self.state_encoder = nn.Linear(64, 256)
        self.image_encoder = nn.Conv2d(3, 64, kernel_size=3)
        self.lang_encoder = nn.Embedding(50000, 256)
        self.fusion = nn.Linear(256 + 64 + 256, 64)
    
    def forward(self, obs):
        # Encode state
        state_feat = self.state_encoder(obs["state"])  # (1, 256)
        
        # Encode images (if present)
        if "images" in obs:
            img = obs["images"][:, 0]  # First camera: (1, H, W, 3)
            img = img.permute(0, 3, 1, 2)  # (1, 3, H, W)
            img_feat = self.image_encoder(img)
            img_feat = img_feat.mean(dim=[2, 3])  # (1, 64)
        else:
            img_feat = torch.zeros(1, 64)
        
        # Encode language (if present)
        if "language" in obs:
            lang = obs["language"]  # (1, seq_len)
            lang_feat = self.lang_encoder(lang).mean(dim=1)  # (1, 256)
        else:
            lang_feat = torch.zeros(1, 256)
        
        # Fuse and predict
        combined = torch.cat([state_feat, img_feat, lang_feat], dim=-1)
        action = self.fusion(combined)
        
        return action

Best Practices

Not all robots provide images or language:
def policy(obs):
    state = obs["state"]  # Always safe
    
    # Check before accessing
    if "images" in obs:
        images = obs["images"]
        # Process images
    
    if "language" in obs:
        lang = obs["language"]
        # Process language
    
    return action
Most hardware drivers expect unpadded actions:
from rfx import unpad_action

# Policy outputs padded
action_padded = policy(obs)  # (1, 64)

# Unpad for hardware
action = unpad_action(action_padded, robot.action_dim)  # (1, 6)

# Send to driver
hardware_driver.send_command(action.numpy())
The ObservationBuffer clones observations automatically, but if you build custom buffers:
# Don't do this - obs tensors may be reused
buffer.append(obs)

# Do this - clone the data
buffer.append({k: v.clone() for k, v in obs.items()})
Source: rfx/python/rfx/observation.py:82

Robot Interface

Learn about the Robot protocol

Policies

Write policies that process observations

Control Loop

Run observation-action loops with Session

Build docs developers (and LLMs) love