Skip to main content
The rfx.collection module provides a simple, LeRobot-native interface for collecting demonstration data from robots and pushing it directly to HuggingFace Hub.

collect

Collect episodes from a robot into a LeRobot dataset.
rfx.collection.collect(
    robot_type: str,
    repo_id: str,
    *,
    output: str | Path = "datasets",
    episodes: int = 1,
    duration_s: float | None = None,
    task: str = "default",
    fps: int = 30,
    state_dim: int = 6,
    camera_names: Sequence[str] = (),
    push_to_hub: bool = False,
    mcap: bool = False,
) -> Dataset
robot_type
str
required
Robot type identifier (e.g., “so101”)
repo_id
str
required
HuggingFace Hub repository ID (e.g., “my-org/demos”)
output
str | Path
default:"datasets"
Local directory to store the dataset
episodes
int
default:"1"
Number of episodes to collect
duration_s
float | None
default:"None"
Duration of each episode in seconds. If None, episodes run until keyboard interrupt.
task
str
default:"default"
Task label for the episodes
fps
int
default:"30"
Frames per second for dataset recording
state_dim
int
default:"6"
Dimensionality of the robot state vector
camera_names
Sequence[str]
default:"()"
Names of cameras to record
push_to_hub
bool
default:"False"
Whether to automatically push the dataset to HuggingFace Hub after collection
mcap
bool
default:"False"
Whether to also write MCAP sidecar files
return
Dataset
The collected dataset

Example

import rfx.collection

# Collect 10 episodes and push to hub
dataset = rfx.collection.collect(
    "so101",
    "my-org/demos",
    episodes=10,
    push_to_hub=True
)

open_dataset

Open an existing local LeRobot dataset.
rfx.collection.open_dataset(
    repo_id: str,
    *,
    root: str | Path = "datasets"
) -> Dataset
repo_id
str
required
Repository ID of the dataset
root
str | Path
default:"datasets"
Root directory containing datasets
return
Dataset
The opened dataset

Example

import rfx.collection

dataset = rfx.collection.open_dataset("my-org/demos")
print(f"Episodes: {dataset.num_episodes}")
print(f"Frames: {dataset.num_frames}")

Dataset

A LeRobot dataset wrapper with rfx helpers.

create

Create a new empty dataset.
Dataset.create(
    repo_id: str,
    *,
    root: str | Path = "datasets",
    fps: int = 30,
    robot_type: str = "so101",
    features: dict[str, Any] | None = None,
    state_dim: int = 6,
    camera_names: Sequence[str] = (),
    camera_shape: tuple[int, int, int] = (480, 640, 3),
    use_videos: bool = True,
) -> Dataset
repo_id
str
required
Repository ID for the dataset
root
str | Path
default:"datasets"
Root directory to store the dataset
fps
int
default:"30"
Frames per second
robot_type
str
default:"so101"
Robot type identifier
features
dict[str, Any] | None
default:"None"
Feature specification. If None, auto-built from state_dim and camera_names.
state_dim
int
default:"6"
Dimensionality of the state vector
camera_names
Sequence[str]
default:"()"
Names of cameras in the dataset
camera_shape
tuple[int, int, int]
default:"(480, 640, 3)"
Shape of camera images (height, width, channels)
use_videos
bool
default:"True"
Whether to encode images as videos

open

Open an existing local dataset.
Dataset.open(
    repo_id: str,
    *,
    root: str | Path = "datasets"
) -> Dataset

from_hub

Pull a dataset from HuggingFace Hub.
Dataset.from_hub(
    repo_id: str,
    *,
    root: str | Path = "datasets"
) -> Dataset

push

Push dataset to HuggingFace Hub.
dataset.push(repo_id: str | None = None) -> None
repo_id
str | None
default:"None"
Optional repository ID. If None, uses the dataset’s existing repo_id.

Properties

repo_id
str
Repository ID of the dataset
num_episodes
int
Number of episodes in the dataset
num_frames
int
Total number of frames in the dataset
fps
int
Frames per second of the dataset

summary

Return aggregate statistics about the dataset.
dataset.summary() -> dict[str, Any]
return
dict[str, Any]
Dictionary containing repo_id, num_episodes, num_frames, fps, and features

validate

Run quality checks on the dataset.
dataset.validate(thresholds: Any = None) -> dict[str, Any]
thresholds
Any
default:"None"
Quality check thresholds
return
dict[str, Any]
Validation results including passed status and statistics

Recorder

Real-time frame recorder that writes directly to a LeRobot Dataset. Thread-safe, designed to be called from a teleop control loop.

create

Create a recorder with a new dataset.
Recorder.create(
    repo_id: str,
    *,
    root: str | Path = "datasets",
    fps: int = 30,
    robot_type: str = "so101",
    state_dim: int = 6,
    camera_names: Sequence[str] = (),
    camera_shape: tuple[int, int, int] = (480, 640, 3),
    use_videos: bool = True,
    mcap: bool = False,
) -> Recorder
repo_id
str
required
Repository ID for the dataset
root
str | Path
default:"datasets"
Root directory to store the dataset
fps
int
default:"30"
Frames per second
robot_type
str
default:"so101"
Robot type identifier
state_dim
int
default:"6"
Dimensionality of the state vector
camera_names
Sequence[str]
default:"()"
Names of cameras to record
camera_shape
tuple[int, int, int]
default:"(480, 640, 3)"
Shape of camera images
use_videos
bool
default:"True"
Whether to encode images as videos
mcap
bool
default:"False"
Whether to also write MCAP sidecar files

start_episode

Begin a new episode.
recorder.start_episode(*, task: str = "default") -> None
task
str
default:"default"
Task label for the episode

add_frame

Add a single frame to the active episode. Thread-safe, called from the teleop control loop.
recorder.add_frame(
    *,
    state: np.ndarray,
    action: np.ndarray | None = None,
    images: dict[str, np.ndarray] | None = None,
) -> None
state
np.ndarray
required
Robot state vector
action
np.ndarray | None
default:"None"
Action vector. If None, copies the state vector.
images
dict[str, np.ndarray] | None
default:"None"
Dictionary mapping camera names to image arrays

save_episode

Finalize the current episode.
recorder.save_episode() -> int
return
int
Number of frames in the saved episode

push

Push the dataset to HuggingFace Hub.
recorder.push(repo_id: str | None = None) -> None

Properties

dataset
Dataset
The underlying dataset being recorded to
is_recording
bool
Whether an episode is currently being recorded

Example

from rfx.collection import Recorder
import numpy as np

recorder = Recorder.create(
    "my-org/demos",
    robot_type="so101",
    state_dim=6
)

recorder.start_episode(task="pick-place")

for _ in range(100):
    state = np.random.rand(6)
    recorder.add_frame(state=state)

recorder.save_episode()
recorder.push()

Build docs developers (and LLMs) love