The rfx.teleop module provides a high-rate teleoperation runtime for SO-101 robots with async camera streams, data recording, and transport integration.
run
Run a teleoperation session.
rfx.teleop.run(
arm: ArmPairConfig | Sequence[ArmPairConfig],
*,
logging: bool = False,
rate_hz: float | None = None,
duration_s: float | None = None,
cameras: Sequence[CameraStreamConfig] | None = None,
transport: Any | None = None,
data_output: str | Path | None = None,
lineage: str | None = None,
scale: str | None = None,
format: str = "native",
metadata: Mapping[str, Any] | None = None,
otel: bool = False,
otel_exporter: str = "console",
otel_sample_every: int = 100,
otlp_endpoint: str | None = None,
) -> RecordedEpisode | dict[str, Any] | None
arm
ArmPairConfig | Sequence[ArmPairConfig]
required
Arm pair configuration(s)
Enable live per-loop movement trace in terminal
rate_hz
float | None
default:"None"
Control loop rate in Hz
duration_s
float | None
default:"None"
Duration to run in seconds. If None, runs until keyboard interrupt.
cameras
Sequence[CameraStreamConfig] | None
default:"None"
Camera configurations for recording
Transport configuration for data publishing
data_output
str | Path | None
default:"None"
Directory to record data. If None, recording is disabled.
Lineage metadata for the recording
Scale metadata for the recording
Export format: “native”, “mcap”, or “lerobot”
metadata
Mapping[str, Any] | None
default:"None"
Additional metadata for the recording
Enable OpenTelemetry tracing
OpenTelemetry exporter type
Sample telemetry every N iterations
OTLP endpoint for telemetry export
return
RecordedEpisode | dict[str, Any] | None
Recorded episode if data_output is set, otherwise None
Example
import rfx.teleop
arm = rfx.teleop.so101(
leader_port="/dev/ttyACM0",
follower_port="/dev/ttyACM1"
)
rfx.teleop.run(arm, data_output="recordings", duration_s=60.0)
so101
Beginner-friendly SO-101 arm spec factory.
rfx.teleop.so101(
config: str | Mapping[str, Any] | None = None,
**kwargs: Any
) -> ArmPairConfig
config
str | Mapping[str, Any] | None
default:"None"
Configuration dictionary or file path
Additional configuration parameters (name, leader_port, follower_port)
Example
import rfx.teleop
# Auto-discovery
arm = rfx.teleop.so101()
# Explicit ports
arm = rfx.teleop.so101(
leader_port="/dev/cu.usbmodemA",
follower_port="/dev/cu.usbmodemB"
)
# From dict
arm = rfx.teleop.so101({
"name": "main",
"leader_port": "/dev/ttyACM0",
"follower_port": "/dev/ttyACM1"
})
BimanualSo101Session
Python-first high-rate teleop session with async cameras and recorder integration.
Constructor
BimanualSo101Session(
config: TeleopSessionConfig,
*,
recorder: LeRobotRecorder | None = None,
collection_recorder: Any | None = None,
pair_factory: Callable[[ArmPairConfig], ArmPair] | None = None,
transport: TransportLike | None = None,
)
config
TeleopSessionConfig
required
Session configuration
recorder
LeRobotRecorder | None
default:"None"
Custom recorder instance
Collection recorder for integration with rfx.collection
pair_factory
Callable[[ArmPairConfig], ArmPair] | None
default:"None"
Custom arm pair factory
transport
TransportLike | None
default:"None"
Custom transport instance
from_ports
Create a bimanual session from explicit port specifications.
BimanualSo101Session.from_ports(
*,
left_leader_port: str = "/dev/ttyACM0",
left_follower_port: str = "/dev/ttyACM1",
right_leader_port: str = "/dev/ttyACM2",
right_follower_port: str = "/dev/ttyACM3",
**kwargs: Any,
) -> BimanualSo101Session
from_single_pair
Create a session from a single arm pair.
BimanualSo101Session.from_single_pair(
*,
leader_port: str = "/dev/ttyACM0",
follower_port: str = "/dev/ttyACM1",
**kwargs: Any,
) -> BimanualSo101Session
start
Start the teleoperation session.
stop
Stop the teleoperation session.
go_home
Send all follower arms to their home positions.
session.go_home() -> None
start_recording
Start recording an episode.
session.start_recording(
*,
label: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> str
metadata
Mapping[str, Any] | None
default:"None"
Additional metadata
stop_recording
Stop recording the current episode.
session.stop_recording() -> RecordedEpisode
record_episode
Record an episode for a fixed duration.
session.record_episode(
*,
duration_s: float,
label: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> RecordedEpisode
Duration to record in seconds
metadata
Mapping[str, Any] | None
default:"None"
Additional metadata
timing_stats
Get loop timing statistics.
session.timing_stats() -> LoopTimingStats
Timing statistics including jitter, overruns, and period measurements
reset_timing_stats
Reset accumulated loop timing counters.
session.reset_timing_stats() -> None
check_health
Check session health and raise if errors occurred.
session.check_health() -> None
run
Run teleoperation until Ctrl+C.
Properties
Whether the session is currently running
Whether an episode is currently being recorded
Example
from rfx.teleop import BimanualSo101Session
from rfx.teleop.config import TeleopSessionConfig
session = BimanualSo101Session.from_single_pair(
leader_port="/dev/ttyACM0",
follower_port="/dev/ttyACM1"
)
with session:
episode = session.record_episode(duration_s=30.0, label="demo")
print(f"Recorded {len(episode.control_steps)} control steps")
LoopTimingStats
Loop timing summary for jitter and overrun analysis.
@dataclass(frozen=True)
class LoopTimingStats:
iterations: int
overruns: int
target_period_s: float
avg_period_s: float
p50_jitter_s: float
p95_jitter_s: float
p99_jitter_s: float
max_jitter_s: float
Total number of control loop iterations
Number of iterations that exceeded the target period
Average actual period in seconds
95th percentile jitter in seconds
99th percentile jitter in seconds
Maximum jitter in seconds
to_dict
Convert to a dictionary.
stats.to_dict() -> dict[str, float | int]
Configuration Types
ArmPairConfig
Configuration for a leader/follower arm pair.
@dataclass
class ArmPairConfig:
name: str
leader_port: str
follower_port: str
CameraStreamConfig
Configuration for a camera stream.
@dataclass
class CameraStreamConfig:
name: str
device_id: int
width: int
height: int
fps: int
TeleopSessionConfig
Configuration for a teleoperation session.
@dataclass
class TeleopSessionConfig:
arm_pairs: tuple[ArmPairConfig, ...]
rate_hz: float
output_dir: Path
cameras: tuple[CameraStreamConfig, ...]
transport: TransportConfig
max_timing_samples: int
Transport
create_transport
Create a transport instance from configuration.
rfx.teleop.create_transport(
config: TransportConfig | None = None
) -> TransportLike
rust_transport_available
Check if Rust transport is available.
rfx.teleop.rust_transport_available() -> bool
zenoh_transport_available
Check if Zenoh transport is available.
rfx.teleop.zenoh_transport_available() -> bool
Benchmarking
run_jitter_benchmark
Run a jitter benchmark.
rfx.teleop.run_jitter_benchmark(
rate_hz: float,
duration_s: float
) -> JitterBenchmarkResult
Duration to run benchmark in seconds
Benchmark results including timing statistics
assert_jitter_budget
Assert that jitter is within acceptable bounds.
rfx.teleop.assert_jitter_budget(
stats: LoopTimingStats,
max_p99_jitter_s: float
) -> None
Timing statistics to check
Maximum acceptable p99 jitter in seconds