Skip to main content
The rfx.teleop module provides a high-rate teleoperation runtime for SO-101 robots with async camera streams, data recording, and transport integration.

run

Run a teleoperation session.
rfx.teleop.run(
    arm: ArmPairConfig | Sequence[ArmPairConfig],
    *,
    logging: bool = False,
    rate_hz: float | None = None,
    duration_s: float | None = None,
    cameras: Sequence[CameraStreamConfig] | None = None,
    transport: Any | None = None,
    data_output: str | Path | None = None,
    lineage: str | None = None,
    scale: str | None = None,
    format: str = "native",
    metadata: Mapping[str, Any] | None = None,
    otel: bool = False,
    otel_exporter: str = "console",
    otel_sample_every: int = 100,
    otlp_endpoint: str | None = None,
) -> RecordedEpisode | dict[str, Any] | None
arm
ArmPairConfig | Sequence[ArmPairConfig]
required
Arm pair configuration(s)
logging
bool
default:"False"
Enable live per-loop movement trace in terminal
rate_hz
float | None
default:"None"
Control loop rate in Hz
duration_s
float | None
default:"None"
Duration to run in seconds. If None, runs until keyboard interrupt.
cameras
Sequence[CameraStreamConfig] | None
default:"None"
Camera configurations for recording
transport
Any | None
default:"None"
Transport configuration for data publishing
data_output
str | Path | None
default:"None"
Directory to record data. If None, recording is disabled.
lineage
str | None
default:"None"
Lineage metadata for the recording
scale
str | None
default:"None"
Scale metadata for the recording
format
str
default:"native"
Export format: “native”, “mcap”, or “lerobot”
metadata
Mapping[str, Any] | None
default:"None"
Additional metadata for the recording
otel
bool
default:"False"
Enable OpenTelemetry tracing
otel_exporter
str
default:"console"
OpenTelemetry exporter type
otel_sample_every
int
default:"100"
Sample telemetry every N iterations
otlp_endpoint
str | None
default:"None"
OTLP endpoint for telemetry export
return
RecordedEpisode | dict[str, Any] | None
Recorded episode if data_output is set, otherwise None

Example

import rfx.teleop

arm = rfx.teleop.so101(
    leader_port="/dev/ttyACM0",
    follower_port="/dev/ttyACM1"
)

rfx.teleop.run(arm, data_output="recordings", duration_s=60.0)

so101

Beginner-friendly SO-101 arm spec factory.
rfx.teleop.so101(
    config: str | Mapping[str, Any] | None = None,
    **kwargs: Any
) -> ArmPairConfig
config
str | Mapping[str, Any] | None
default:"None"
Configuration dictionary or file path
kwargs
Any
Additional configuration parameters (name, leader_port, follower_port)
return
ArmPairConfig
Configured arm pair

Example

import rfx.teleop

# Auto-discovery
arm = rfx.teleop.so101()

# Explicit ports
arm = rfx.teleop.so101(
    leader_port="/dev/cu.usbmodemA",
    follower_port="/dev/cu.usbmodemB"
)

# From dict
arm = rfx.teleop.so101({
    "name": "main",
    "leader_port": "/dev/ttyACM0",
    "follower_port": "/dev/ttyACM1"
})

BimanualSo101Session

Python-first high-rate teleop session with async cameras and recorder integration.

Constructor

BimanualSo101Session(
    config: TeleopSessionConfig,
    *,
    recorder: LeRobotRecorder | None = None,
    collection_recorder: Any | None = None,
    pair_factory: Callable[[ArmPairConfig], ArmPair] | None = None,
    transport: TransportLike | None = None,
)
config
TeleopSessionConfig
required
Session configuration
recorder
LeRobotRecorder | None
default:"None"
Custom recorder instance
collection_recorder
Any | None
default:"None"
Collection recorder for integration with rfx.collection
pair_factory
Callable[[ArmPairConfig], ArmPair] | None
default:"None"
Custom arm pair factory
transport
TransportLike | None
default:"None"
Custom transport instance

from_ports

Create a bimanual session from explicit port specifications.
BimanualSo101Session.from_ports(
    *,
    left_leader_port: str = "/dev/ttyACM0",
    left_follower_port: str = "/dev/ttyACM1",
    right_leader_port: str = "/dev/ttyACM2",
    right_follower_port: str = "/dev/ttyACM3",
    **kwargs: Any,
) -> BimanualSo101Session

from_single_pair

Create a session from a single arm pair.
BimanualSo101Session.from_single_pair(
    *,
    leader_port: str = "/dev/ttyACM0",
    follower_port: str = "/dev/ttyACM1",
    **kwargs: Any,
) -> BimanualSo101Session

start

Start the teleoperation session.
session.start() -> None

stop

Stop the teleoperation session.
session.stop() -> None

go_home

Send all follower arms to their home positions.
session.go_home() -> None

start_recording

Start recording an episode.
session.start_recording(
    *,
    label: str | None = None,
    metadata: Mapping[str, Any] | None = None,
) -> str
label
str | None
default:"None"
Label for the episode
metadata
Mapping[str, Any] | None
default:"None"
Additional metadata
return
str
Episode ID

stop_recording

Stop recording the current episode.
session.stop_recording() -> RecordedEpisode
return
RecordedEpisode
The recorded episode

record_episode

Record an episode for a fixed duration.
session.record_episode(
    *,
    duration_s: float,
    label: str | None = None,
    metadata: Mapping[str, Any] | None = None,
) -> RecordedEpisode
duration_s
float
required
Duration to record in seconds
label
str | None
default:"None"
Episode label
metadata
Mapping[str, Any] | None
default:"None"
Additional metadata
return
RecordedEpisode
The recorded episode

timing_stats

Get loop timing statistics.
session.timing_stats() -> LoopTimingStats
return
LoopTimingStats
Timing statistics including jitter, overruns, and period measurements

reset_timing_stats

Reset accumulated loop timing counters.
session.reset_timing_stats() -> None

check_health

Check session health and raise if errors occurred.
session.check_health() -> None

run

Run teleoperation until Ctrl+C.
session.run() -> None

Properties

is_running
bool
Whether the session is currently running
is_recording
bool
Whether an episode is currently being recorded

Example

from rfx.teleop import BimanualSo101Session
from rfx.teleop.config import TeleopSessionConfig

session = BimanualSo101Session.from_single_pair(
    leader_port="/dev/ttyACM0",
    follower_port="/dev/ttyACM1"
)

with session:
    episode = session.record_episode(duration_s=30.0, label="demo")
    print(f"Recorded {len(episode.control_steps)} control steps")

LoopTimingStats

Loop timing summary for jitter and overrun analysis.
@dataclass(frozen=True)
class LoopTimingStats:
    iterations: int
    overruns: int
    target_period_s: float
    avg_period_s: float
    p50_jitter_s: float
    p95_jitter_s: float
    p99_jitter_s: float
    max_jitter_s: float
iterations
int
Total number of control loop iterations
overruns
int
Number of iterations that exceeded the target period
target_period_s
float
Target period in seconds
avg_period_s
float
Average actual period in seconds
p50_jitter_s
float
Median jitter in seconds
p95_jitter_s
float
95th percentile jitter in seconds
p99_jitter_s
float
99th percentile jitter in seconds
max_jitter_s
float
Maximum jitter in seconds

to_dict

Convert to a dictionary.
stats.to_dict() -> dict[str, float | int]

Configuration Types

ArmPairConfig

Configuration for a leader/follower arm pair.
@dataclass
class ArmPairConfig:
    name: str
    leader_port: str
    follower_port: str

CameraStreamConfig

Configuration for a camera stream.
@dataclass
class CameraStreamConfig:
    name: str
    device_id: int
    width: int
    height: int
    fps: int

TeleopSessionConfig

Configuration for a teleoperation session.
@dataclass
class TeleopSessionConfig:
    arm_pairs: tuple[ArmPairConfig, ...]
    rate_hz: float
    output_dir: Path
    cameras: tuple[CameraStreamConfig, ...]
    transport: TransportConfig
    max_timing_samples: int

Transport

create_transport

Create a transport instance from configuration.
rfx.teleop.create_transport(
    config: TransportConfig | None = None
) -> TransportLike

rust_transport_available

Check if Rust transport is available.
rfx.teleop.rust_transport_available() -> bool

zenoh_transport_available

Check if Zenoh transport is available.
rfx.teleop.zenoh_transport_available() -> bool

Benchmarking

run_jitter_benchmark

Run a jitter benchmark.
rfx.teleop.run_jitter_benchmark(
    rate_hz: float,
    duration_s: float
) -> JitterBenchmarkResult
rate_hz
float
required
Target loop rate in Hz
duration_s
float
required
Duration to run benchmark in seconds
return
JitterBenchmarkResult
Benchmark results including timing statistics

assert_jitter_budget

Assert that jitter is within acceptable bounds.
rfx.teleop.assert_jitter_budget(
    stats: LoopTimingStats,
    max_p99_jitter_s: float
) -> None
stats
LoopTimingStats
required
Timing statistics to check
max_p99_jitter_s
float
required
Maximum acceptable p99 jitter in seconds

Build docs developers (and LLMs) love