rfx.tf
Thread-safe transform buffer with BFS frame graph, timestamp interpolation, broadcaster, and listener. All transforms are published via pub/sub on rfx/tf/{parent}/{child}.
from rfx.tf import TransformBuffer, TransformBroadcaster, TransformListener
# Create buffer and broadcaster
buffer = TransformBuffer()
broadcaster = TransformBroadcaster(transport, buffer)
# Broadcast a transform
tf = TransformStamped(
parent_frame="world",
child_frame="robot_base",
timestamp_ns=time.time_ns(),
translation=(1.0, 0.0, 0.5),
rotation=(0.0, 0.0, 0.0, 1.0)
)
broadcaster.send_transform(tf)
# Look up transform
tf = buffer.lookup("world", "robot_base")
Represents a transform between two coordinate frames at a specific timestamp.
Constructor
TransformStamped(
parent_frame: str,
child_frame: str,
timestamp_ns: int,
translation: tuple[float, float, float],
rotation: tuple[float, float, float, float]
)
Name of the parent frame.
Timestamp in nanoseconds.
translation
tuple[float, float, float]
required
Translation vector (x, y, z).
rotation
tuple[float, float, float, float]
required
Rotation as quaternion (x, y, z, w).
Methods
to_dict
Serialize to a dictionary.
Dictionary representation with all fields.
from_dict
@staticmethod
TransformStamped.from_dict(d: dict) -> TransformStamped
Deserialize from a dictionary.
Dictionary containing transform data.
Reconstructed TransformStamped.
identity
@staticmethod
TransformStamped.identity(parent: str, child: str) -> TransformStamped
Create an identity transform.
Identity transform with zero translation and unit quaternion.
Thread-safe buffer of transforms with frame graph and BFS lookup.
from rfx.tf import TransformBuffer, TransformStamped
import time
buffer = TransformBuffer(max_history=100)
# Add transforms
tf1 = TransformStamped(
parent_frame="world",
child_frame="robot",
timestamp_ns=time.time_ns(),
translation=(1.0, 0.0, 0.0),
rotation=(0.0, 0.0, 0.0, 1.0)
)
buffer.set_transform(tf1)
# Look up transform chain
tf = buffer.lookup("world", "end_effector")
if tf:
print(f"Translation: {tf.translation}")
Constructor
TransformBuffer(max_history: int = 100)
Maximum number of transforms to store per frame pair.
Methods
buffer.set_transform(tf: TransformStamped) -> None
Add a transform to the buffer.
lookup
buffer.lookup(
target_frame: str,
source_frame: str
) -> TransformStamped | None
Look up transform from source_frame to target_frame using BFS.
Transform from source to target, or None if no path exists.
all_frames
buffer.all_frames() -> list[str]
Get list of all known frames.
Sorted list of frame names.
Publishes transforms to transport on rfx/tf/{parent}/{child}.
from rfx.tf import TransformBroadcaster, TransformBuffer
buffer = TransformBuffer()
broadcaster = TransformBroadcaster(transport, buffer)
tf = TransformStamped(...)
broadcaster.send_transform(tf)
Constructor
TransformBroadcaster(
transport: Any,
buffer: TransformBuffer | None = None
)
Transport layer for publishing (e.g., MQTT, ZeroMQ).
buffer
TransformBuffer | None
default:"None"
Optional buffer to also update locally when broadcasting.
Methods
broadcaster.send_transform(tf: TransformStamped) -> None
Broadcast a transform.
Broadcasts latched/static transforms with periodic republish.
from rfx.tf import StaticTransformBroadcaster
static_broadcaster = StaticTransformBroadcaster(
transport,
buffer,
republish_interval_s=10.0
)
# Broadcast static transform (will be republished every 10s)
tf = TransformStamped(...)
static_broadcaster.send_transform(tf)
Constructor
StaticTransformBroadcaster(
transport: Any,
buffer: TransformBuffer | None = None,
republish_interval_s: float = 10.0
)
Transport layer for publishing.
buffer
TransformBuffer | None
default:"None"
Optional buffer to update locally.
Interval in seconds to republish static transforms.
Methods
static_broadcaster.send_transform(tf: TransformStamped) -> None
Broadcast a static transform (will be republished periodically).
Static transform to broadcast.
stop
static_broadcaster.stop() -> None
Stop the republish thread.
Subscribes to rfx/tf/** and populates a TransformBuffer.
from rfx.tf import TransformListener, TransformBuffer
buffer = TransformBuffer()
listener = TransformListener(transport, buffer)
# Buffer is now automatically updated as transforms are received
time.sleep(1.0)
tf = buffer.lookup("world", "robot")
Constructor
TransformListener(
transport: Any,
buffer: TransformBuffer
)
Transport layer for subscribing.
Buffer to populate with received transforms.
Methods
stop
Stop the listener thread.
Broadcast all transforms from a URDF model given joint positions.
from rfx.tf import broadcast_urdf_transforms, TransformBroadcaster
broadcaster = TransformBroadcaster(transport)
joint_positions = {"joint1": 0.5, "joint2": -0.3}
broadcast_urdf_transforms(urdf, joint_positions, broadcaster)
Function Signature
broadcast_urdf_transforms(
urdf: Any,
joint_positions: dict[str, float] | list[float],
broadcaster: TransformBroadcaster
) -> None
URDF model object with forward_kinematics method.
joint_positions
dict[str, float] | list[float]
required
Joint positions as a dictionary mapping joint names to values, or a list of values.
broadcaster
TransformBroadcaster
required
Broadcaster to use for publishing transforms.