Skip to main content

rfx.tf

Thread-safe transform buffer with BFS frame graph, timestamp interpolation, broadcaster, and listener. All transforms are published via pub/sub on rfx/tf/{parent}/{child}.
from rfx.tf import TransformBuffer, TransformBroadcaster, TransformListener

# Create buffer and broadcaster
buffer = TransformBuffer()
broadcaster = TransformBroadcaster(transport, buffer)

# Broadcast a transform
tf = TransformStamped(
    parent_frame="world",
    child_frame="robot_base",
    timestamp_ns=time.time_ns(),
    translation=(1.0, 0.0, 0.5),
    rotation=(0.0, 0.0, 0.0, 1.0)
)
broadcaster.send_transform(tf)

# Look up transform
tf = buffer.lookup("world", "robot_base")

TransformStamped

Represents a transform between two coordinate frames at a specific timestamp.

Constructor

TransformStamped(
    parent_frame: str,
    child_frame: str,
    timestamp_ns: int,
    translation: tuple[float, float, float],
    rotation: tuple[float, float, float, float]
)
parent_frame
str
required
Name of the parent frame.
child_frame
str
required
Name of the child frame.
timestamp_ns
int
required
Timestamp in nanoseconds.
translation
tuple[float, float, float]
required
Translation vector (x, y, z).
rotation
tuple[float, float, float, float]
required
Rotation as quaternion (x, y, z, w).

Methods

to_dict

tf.to_dict() -> dict
Serialize to a dictionary.
return
dict
Dictionary representation with all fields.

from_dict

@staticmethod
TransformStamped.from_dict(d: dict) -> TransformStamped
Deserialize from a dictionary.
d
dict
required
Dictionary containing transform data.
return
TransformStamped
Reconstructed TransformStamped.

identity

@staticmethod
TransformStamped.identity(parent: str, child: str) -> TransformStamped
Create an identity transform.
parent
str
required
Parent frame name.
child
str
required
Child frame name.
return
TransformStamped
Identity transform with zero translation and unit quaternion.

TransformBuffer

Thread-safe buffer of transforms with frame graph and BFS lookup.
from rfx.tf import TransformBuffer, TransformStamped
import time

buffer = TransformBuffer(max_history=100)

# Add transforms
tf1 = TransformStamped(
    parent_frame="world",
    child_frame="robot",
    timestamp_ns=time.time_ns(),
    translation=(1.0, 0.0, 0.0),
    rotation=(0.0, 0.0, 0.0, 1.0)
)
buffer.set_transform(tf1)

# Look up transform chain
tf = buffer.lookup("world", "end_effector")
if tf:
    print(f"Translation: {tf.translation}")

Constructor

TransformBuffer(max_history: int = 100)
max_history
int
default:"100"
Maximum number of transforms to store per frame pair.

Methods

set_transform

buffer.set_transform(tf: TransformStamped) -> None
Add a transform to the buffer.
tf
TransformStamped
required
Transform to add.

lookup

buffer.lookup(
    target_frame: str,
    source_frame: str
) -> TransformStamped | None
Look up transform from source_frame to target_frame using BFS.
target_frame
str
required
Target coordinate frame.
source_frame
str
required
Source coordinate frame.
return
TransformStamped | None
Transform from source to target, or None if no path exists.

all_frames

buffer.all_frames() -> list[str]
Get list of all known frames.
return
list[str]
Sorted list of frame names.

TransformBroadcaster

Publishes transforms to transport on rfx/tf/{parent}/{child}.
from rfx.tf import TransformBroadcaster, TransformBuffer

buffer = TransformBuffer()
broadcaster = TransformBroadcaster(transport, buffer)

tf = TransformStamped(...)
broadcaster.send_transform(tf)

Constructor

TransformBroadcaster(
    transport: Any,
    buffer: TransformBuffer | None = None
)
transport
Any
required
Transport layer for publishing (e.g., MQTT, ZeroMQ).
buffer
TransformBuffer | None
default:"None"
Optional buffer to also update locally when broadcasting.

Methods

send_transform

broadcaster.send_transform(tf: TransformStamped) -> None
Broadcast a transform.
tf
TransformStamped
required
Transform to broadcast.

StaticTransformBroadcaster

Broadcasts latched/static transforms with periodic republish.
from rfx.tf import StaticTransformBroadcaster

static_broadcaster = StaticTransformBroadcaster(
    transport,
    buffer,
    republish_interval_s=10.0
)

# Broadcast static transform (will be republished every 10s)
tf = TransformStamped(...)
static_broadcaster.send_transform(tf)

Constructor

StaticTransformBroadcaster(
    transport: Any,
    buffer: TransformBuffer | None = None,
    republish_interval_s: float = 10.0
)
transport
Any
required
Transport layer for publishing.
buffer
TransformBuffer | None
default:"None"
Optional buffer to update locally.
republish_interval_s
float
default:"10.0"
Interval in seconds to republish static transforms.

Methods

send_transform

static_broadcaster.send_transform(tf: TransformStamped) -> None
Broadcast a static transform (will be republished periodically).
tf
TransformStamped
required
Static transform to broadcast.

stop

static_broadcaster.stop() -> None
Stop the republish thread.

TransformListener

Subscribes to rfx/tf/** and populates a TransformBuffer.
from rfx.tf import TransformListener, TransformBuffer

buffer = TransformBuffer()
listener = TransformListener(transport, buffer)

# Buffer is now automatically updated as transforms are received
time.sleep(1.0)
tf = buffer.lookup("world", "robot")

Constructor

TransformListener(
    transport: Any,
    buffer: TransformBuffer
)
transport
Any
required
Transport layer for subscribing.
buffer
TransformBuffer
required
Buffer to populate with received transforms.

Methods

stop

listener.stop() -> None
Stop the listener thread.

broadcast_urdf_transforms

Broadcast all transforms from a URDF model given joint positions.
from rfx.tf import broadcast_urdf_transforms, TransformBroadcaster

broadcaster = TransformBroadcaster(transport)
joint_positions = {"joint1": 0.5, "joint2": -0.3}

broadcast_urdf_transforms(urdf, joint_positions, broadcaster)

Function Signature

broadcast_urdf_transforms(
    urdf: Any,
    joint_positions: dict[str, float] | list[float],
    broadcaster: TransformBroadcaster
) -> None
urdf
Any
required
URDF model object with forward_kinematics method.
joint_positions
dict[str, float] | list[float]
required
Joint positions as a dictionary mapping joint names to values, or a list of values.
broadcaster
TransformBroadcaster
required
Broadcaster to use for publishing transforms.

Build docs developers (and LLMs) love