Skip to main content
The alpasim_utils module provides shared utilities for AlpaSim, including trajectory manipulation, quaternion math, logging, and data structures.

Installation

The utils module is installed as part of the AlpaSim workspace:
# From repository root
./setup_local_env.sh
source .venv/bin/activate
The module is located at src/utils/alpasim_utils/.

Trajectory

The Trajectory class handles timestamped 3D poses.
from alpasim_utils.trajectory import Trajectory
from alpasim_utils.qvec import QVec
import numpy as np

# Create trajectory
timestamps = np.array([0, 100000, 200000], dtype=np.uint64)  # microseconds
poses = QVec(
    vec3=np.array([[0, 0, 0], [1, 0, 0], [2, 0, 0]], dtype=np.float32),
    quat=np.array([[1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0]], dtype=np.float32)
)

traj = Trajectory(timestamps_us=timestamps, poses=poses)
print(f"Length: {len(traj)}")
print(f"Time range: {traj.time_range_us}")

Key Methods

interpolate_to_timestamps
QVec
Interpolate poses to specific timestamps
query_times = np.array([50000, 150000], dtype=np.uint64)
interpolated = traj.interpolate_to_timestamps(query_times)
transform
Trajectory
Apply a transform to all poses
# World-frame transform (left multiply)
shifted = traj.transform(transform, is_relative=False)

# Body-frame transform (right multiply)
offset = traj.transform(transform, is_relative=True)
clip
Trajectory
Extract time slice
clipped = traj.clip(start_us=100000, end_us=300000)
from_grpc
staticmethod
Create from gRPC Trajectory message
from alpasim_grpc.v0.common_pb2 import Trajectory as TrajectoryProto

traj = Trajectory.from_grpc(grpc_trajectory)
to_grpc
Trajectory
Convert to gRPC message
grpc_msg = traj.to_grpc()

Properties

# Time range
print(traj.time_range_us)  # range(start, end)
print(traj.duration_us)    # end - start

# Check if empty
if traj.is_empty():
    print("No poses")

# Get velocities (numerical differentiation)
velocities = traj.get_velocities()  # (N, 3) array

QVec (Quaternion + Vector)

Batched quaternion and translation for efficient pose operations.
from alpasim_utils.qvec import QVec
import numpy as np

# Single pose
pose = QVec(
    vec3=np.array([1.0, 2.0, 3.0], dtype=np.float32),
    quat=np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32)  # w, x, y, z
)

# Batch of poses
poses = QVec(
    vec3=np.array([[0, 0, 0], [1, 0, 0], [2, 0, 0]], dtype=np.float32),
    quat=np.array([[1, 0, 0, 0], [1, 0, 0, 0], [1, 0, 0, 0]], dtype=np.float32)
)
print(f"Batch size: {poses.batch_size}")  # (3,)

Operations

# Composition (pose multiplication)
result = pose1 @ pose2

# Inverse
inv_pose = pose.inverse()

# Transform points
points = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float32)
transformed = pose.apply(points)

# Interpolation (SLERP + linear)
interpolated = QVec.interpolate(pose1, pose2, alpha=0.5)

# From/to gRPC
from alpasim_grpc.v0.common_pb2 import Pose
grpc_pose = pose.to_grpc()
pose_from_grpc = QVec.from_grpc(grpc_pose)

Conversion

# To/from rotation matrix
rot_matrix = pose.to_rotation_matrix()  # (3, 3) or (N, 3, 3)
pose_from_mat = QVec.from_rotation_matrix(rot_matrix, translation)

# To/from homogeneous matrix
homogeneous = pose.to_homogeneous()  # (4, 4) or (N, 4, 4)

Scenario Data Structures

VehicleConfig

Vehicle dimensions and parameters:
from alpasim_utils.scenario import VehicleConfig

vehicle = VehicleConfig(
    aabb_x_m=4.5,         # Length
    aabb_y_m=2.0,         # Width
    aabb_z_m=1.5,         # Height
    aabb_x_offset_m=-2.5, # Rear axle offset
    aabb_y_offset_m=0.0,
    aabb_z_offset_m=0.0
)
aabb_x_m
float
required
Vehicle length (meters)
aabb_y_m
float
required
Vehicle width (meters)
aabb_z_m
float
required
Vehicle height (meters)
aabb_x_offset_m
float
X offset from rig origin to bounding box rear edge (typically negative)

AABB

Axis-aligned bounding box:
from alpasim_utils.scenario import AABB
from alpasim_grpc.v0.common_pb2 import AABB as AABBProto

aabb = AABB(size_x=4.5, size_y=2.0, size_z=1.5)

# Convert to gRPC
grpc_aabb = AABBProto(
    size_x=aabb.size_x,
    size_y=aabb.size_y,
    size_z=aabb.size_z
)

TrafficObjects

Background actors in a scene:
from alpasim_utils.scenario import TrafficObjects, TrafficObject

traffic = TrafficObjects([
    TrafficObject(
        object_id="vehicle_001",
        trajectory=traj1,
        aabb=AABB(size_x=4.5, size_y=2.0, size_z=1.5),
        is_static=False
    ),
    TrafficObject(
        object_id="pole_001",
        trajectory=traj2,
        aabb=AABB(size_x=0.3, size_y=0.3, size_z=3.0),
        is_static=True
    )
])

# Access by ID
obj = traffic.get_object("vehicle_001")

# Iterate
for obj in traffic:
    print(f"{obj.object_id}: {obj.aabb}")

Logging

LogWriter

Write AlpaSim Simulation Logs (ASL):
from alpasim_utils.logs import LogWriter
from alpasim_grpc.v0.logging_pb2 import LogEntry, ActorPoses

async with LogWriter("simulation.asl") as writer:
    # Write metadata
    await writer.write_metadata(
        scene_id="waymo_sf_001",
        session_uuid="abc-123",
        random_seed=42
    )
    
    # Write simulation steps
    for timestep in simulation:
        entry = LogEntry(
            timestamp_us=timestep.time,
            ego_pose=timestep.ego_pose.to_grpc(),
            actor_poses=ActorPoses(
                poses=[obj.pose.to_grpc() for obj in timestep.actors]
            )
        )
        await writer.write_entry(entry)

LogReader

Read ASL files:
from alpasim_utils.logs import LogReader

with LogReader("simulation.asl") as reader:
    metadata = reader.read_metadata()
    print(f"Scene: {metadata.scene_id}")
    
    for entry in reader:
        print(f"Time: {entry.timestamp_us}")
        print(f"Ego pose: {entry.ego_pose}")

Artifacts

Manage scene data and assets:
from alpasim_utils.artifact import Artifact

# Load scene artifact
artifact = Artifact.from_scene_id(
    scene_id="waymo_sf_001",
    cache_dir="/data/scenes"
)

# Access data
map_data = artifact.get_vector_map()
ground_mesh = artifact.get_ground_mesh()
cameras = artifact.get_available_cameras()
get_vector_map
VectorMap
Load road network data for the scene
get_ground_mesh
GroundMesh
Load ground geometry for physics
get_available_cameras
list[CameraSpec]
Get camera configurations from scene

Polyline

2D/3D polyline operations:
from alpasim_utils.polyline import Polyline
import numpy as np

points = np.array([[0, 0], [1, 1], [2, 0]], dtype=np.float32)
polyline = Polyline(points)

# Length
print(f"Total length: {polyline.length()}")

# Closest point
query = np.array([1.5, 0.5])
closest = polyline.closest_point(query)
print(f"Closest point: {closest}")

# Interpolate along path
sampled = polyline.interpolate(distances=np.linspace(0, polyline.length(), 100))

Path Utilities

from alpasim_utils.paths import extract_ids_from_path

# Extract scene/batch/rollout IDs from log path
path = "/logs/rollouts/waymo_sf_001/batch_0/rollout_3/simulation.asl"
clipgt_id, batch_id, rollout_id = extract_ids_from_path(path)
print(f"Scene: {clipgt_id}, Batch: {batch_id}, Rollout: {rollout_id}")
# Output: Scene: waymo_sf_001, Batch: batch_0, Rollout: rollout_3

Type Definitions

from alpasim_utils.types import (
    Timestamp,  # np.uint64
    Pose,       # QVec
    Transform,  # QVec
)

# Type-safe timestamp
time: Timestamp = np.uint64(1000000)  # 1 second in microseconds

Example: Complete Trajectory Processing

import numpy as np
from alpasim_utils.trajectory import Trajectory
from alpasim_utils.qvec import QVec
from alpasim_utils.scenario import VehicleConfig

# Create trajectory from raw data
timestamps = np.arange(0, 10_000_000, 100_000, dtype=np.uint64)  # 10s at 10Hz
positions = np.random.randn(100, 3).astype(np.float32)
quaternions = np.tile([1, 0, 0, 0], (100, 1)).astype(np.float32)

poses = QVec(vec3=positions, quat=quaternions)
traj = Trajectory(timestamps_us=timestamps, poses=poses)

# Transform to different frame
vehicle_to_sensor = QVec(
    vec3=np.array([0.5, 0, 1.5], dtype=np.float32),  # Sensor offset
    quat=np.array([1, 0, 0, 0], dtype=np.float32)
)
sensor_traj = traj.transform(vehicle_to_sensor, is_relative=True)

# Extract time slice
clipped = traj.clip(start_us=2_000_000, end_us=8_000_000)

# Interpolate to specific times
query_times = np.array([2_500_000, 5_000_000, 7_500_000], dtype=np.uint64)
interpolated = clipped.interpolate_to_timestamps(query_times)

# Compute velocities
velocities = clipped.poses.get_velocities()  # Numerical differentiation
speeds = np.linalg.norm(velocities, axis=1)
print(f"Max speed: {speeds.max():.2f} m/s")

# Convert to gRPC for transmission
grpc_traj = clipped.to_grpc()

CLI Tools

The utils module provides command-line tools for working with AlpaSim data. Print ASL log entries for debugging and inspection.
# Print all entries
print-asl rollout.asl

# Print specific range
print-asl rollout.asl --start 10 --end 100

# Print only specific message types
print-asl rollout.asl --message-types actor_poses drive_response

# Print only message types (not content)
print-asl rollout.asl --just-types
asl_file
string
required
Path to the .asl file to print
--start
int
default:"0"
Index of the first message to print
--end
int
default:"None"
Index of the last message to print (default: print all)
--message-types
list
Message types to filter (e.g., actor_poses, drive_response, camera_image)
--just-types
flag
Only print message types, not full content

asl-to-frames

Extract camera frames from ASL logs and save as video or images.
# Extract frames as MP4 video
asl-to-frames rollout.asl --output-format mp4

# Extract as individual JPEG images
asl-to-frames rollout.asl --output-format jpeg

# Process multiple files with glob
asl-to-frames "rollouts/**/*.asl" --output-format mp4

# Specify output directory
asl-to-frames rollout.asl --output-dir ./frames --output-format png
Output is saved to <asl_file_dir>/<asl_file_name>_asl_frames/<camera_id>/<mp4_or_images>.
asl_file
string
required
Path or glob pattern for .asl files
--output-format
string
default:"mp4"
Output format: mp4, jpeg, or png
--output-dir
string
Custom output directory (default: same as input file)
Requires imageio[ffmpeg] for video output. Install with: pip install imageio[ffmpeg]

Build docs developers (and LLMs) love