Skip to main content
The alpasim_runtime module orchestrates the AlpaSim simulation. It manages communication between gRPC services, executes rollouts, and handles logging and evaluation.

Installation

The runtime is installed as part of the AlpaSim workspace:
# From repository root
./setup_local_env.sh
source .venv/bin/activate
The runtime module is located at src/runtime/alpasim_runtime/.

Core Classes

UnboundRollout

Represents rollout metadata and configuration before execution.
from alpasim_runtime.loop import UnboundRollout
from alpasim_runtime.config import ScenarioConfig

# Create from scenario configuration
unbound = UnboundRollout.create(
    scenario_config=scenario_cfg,
    user_sim_config=user_cfg,
    camera_configs=cameras,
    vehicle_config=vehicle_cfg
)
scenario_config
ScenarioConfig
required
Scene ID, timestamps, and scenario metadata
user_sim_config
UserSimulatorConfig
required
Service endpoints and simulation parameters
camera_configs
list[RuntimeCameraConfig]
required
Camera specifications for rendering
vehicle_config
VehicleConfig
required
Vehicle dimensions and physics parameters

BoundRollout

Executable rollout bound to gRPC service connections.
from alpasim_runtime.dispatcher import Dispatcher

# Create service dispatcher
dispatcher = Dispatcher.create(network_config, scene_artifacts)

# Bind rollout to services
bound = await unbound.bind(
    dispatcher=dispatcher,
    vector_map=map_data,
    traffic_objects=traffic_data
)

# Execute simulation
result = await bound.run()
Methods:
run
async ScenarioEvalResult
Execute the rollout and return evaluation metrics

Dispatcher

Manages gRPC service connections and load balancing.
from alpasim_runtime.dispatcher import Dispatcher
from alpasim_runtime.config import NetworkSimulatorConfig

dispatcher = Dispatcher.create(
    network_config=NetworkSimulatorConfig(
        sensorsim=EndpointAddresses(addresses=["localhost:50051"]),
        driver=EndpointAddresses(addresses=["localhost:50052"]),
        physics=EndpointAddresses(addresses=["localhost:50053"]),
        trafficsim=EndpointAddresses(addresses=["localhost:50054"]),
        controller=EndpointAddresses(addresses=["localhost:50055"])
    ),
    scene_artifacts=artifacts
)

# Get service connections
sensorsim = await dispatcher.get_sensorsim_service()
driver = await dispatcher.get_driver_service()
controller = await dispatcher.get_controller_service()
physics = await dispatcher.get_physics_service()
traffic = await dispatcher.get_traffic_service()

Configuration

ScenarioConfig

Defines a single simulation scenario.
from alpasim_runtime.config import ScenarioConfig

scenario = ScenarioConfig(
    scene_id="waymo_sf_001",
    clip_start_time_us=1000000,
    clip_end_time_us=10000000,
    random_seed=42
)
scene_id
str
required
Scene identifier (must match sensorsim/physics scenes)
clip_start_time_us
int
required
Simulation start time (microseconds)
clip_end_time_us
int
required
Simulation end time (microseconds)
random_seed
int
Seed for reproducibility

RuntimeCameraConfig

Camera rendering configuration.
from alpasim_runtime.config import RuntimeCameraConfig

camera = RuntimeCameraConfig(
    logical_id="camera_front_wide_120fov",
    height=720,
    width=1280,
    frame_interval_us=33000,  # ~30fps
    shutter_duration_us=17000,
    first_frame_offset_us=0
)
logical_id
str
required
Camera identifier (must match sensorsim available cameras)
height
int
required
Image height in pixels
width
int
required
Image width in pixels
frame_interval_us
int
required
Time between frames (microseconds)
shutter_duration_us
int
required
Exposure duration (microseconds)

UserSimulatorConfig

Runtime behavior configuration.
from alpasim_runtime.config import (
    UserSimulatorConfig,
    PhysicsUpdateMode,
    RouteGeneratorType
)

config = UserSimulatorConfig(
    enable_rendering=True,
    enable_eval=True,
    physics_update_mode=PhysicsUpdateMode.ONCE_PER_STEP,
    route_generator_type=RouteGeneratorType.FROM_GROUND_TRUTH,
    egomotion_noise_model=None
)
enable_rendering
bool
Whether to render camera images
enable_eval
bool
Whether to compute evaluation metrics
physics_update_mode
PhysicsUpdateMode
  • ONCE_PER_STEP: Single physics update per timestep
  • TWICE_PER_STEP: Update before and after controller
route_generator_type
RouteGeneratorType
  • FROM_GROUND_TRUTH: Use recorded route
  • FROM_MAP: Generate from road network

Service Wrappers

The runtime provides typed wrappers for each gRPC service:

ControllerService

from alpasim_runtime.services.controller_service import ControllerService

controller = ControllerService(grpc_stub, session_uuid)

# Start session
await controller.start_session(rig_file="config.yaml")

# Propagate vehicle
result = await controller.run_controller_and_vehicle(
    state=current_state,
    planned_trajectory=desired_path,
    future_time_us=next_timestamp,
    coerce_dynamic_state=False
)

# Access results
pose = result.pose_local_to_rig  # PoseAtTime
dynamic_state = result.dynamic_state  # DynamicState

DriverService

from alpasim_runtime.services.driver_service import DriverService

driver = DriverService(grpc_stub, session_uuid)

await driver.start_session(
    random_seed=42,
    available_cameras=cameras
)

# Submit observations
await driver.submit_image(camera_id, timestamp, image_bytes)
await driver.submit_egomotion(trajectory, dynamic_state)
await driver.submit_route(waypoints, timestamp)

# Get planned trajectory
trajectory = await driver.drive(
    time_now_us=current_time,
    time_query_us=query_time
)

SensorsimService

from alpasim_runtime.services.sensorsim_service import (
    SensorsimService,
    ImageFormat
)

sensorsim = SensorsimService(grpc_stub)

# Render camera
image_bytes = await sensorsim.render_rgb(
    scene_id="scene_001",
    camera_spec=camera_config,
    frame_start_us=t_start,
    frame_end_us=t_end,
    sensor_pose=camera_pose,
    dynamic_objects=traffic,
    image_format=ImageFormat.JPEG,
    quality=0.95
)

# Render LiDAR
point_cloud = await sensorsim.render_lidar(
    scene_id="scene_001",
    lidar_type="PANDAR128",
    frame_start_us=t_start,
    frame_end_us=t_end,
    sensor_pose=lidar_pose,
    dynamic_objects=traffic
)

Utility Classes

RouteGenerator

Generates route waypoints for the driver.
from alpasim_runtime.route_generator import RouteGenerator
from alpasim_runtime.config import RouteGeneratorType

generator = RouteGenerator(
    generator_type=RouteGeneratorType.FROM_GROUND_TRUTH,
    ground_truth_trajectory=recorded_path,
    vector_map=map_data
)

waypoints = generator.get_route_at_time(
    timestamp_us=current_time,
    current_pose=ego_pose
)

EgomotionNoiseModel

Adds realistic sensor noise to localization.
from alpasim_runtime.noise_models import EgomotionNoiseModel
from alpasim_runtime.config import EgomotionNoiseModelConfig

noise_model = EgomotionNoiseModel(
    config=EgomotionNoiseModelConfig(
        position_stddev_m=0.05,
        rotation_stddev_rad=0.01,
        velocity_stddev_mps=0.1
    ),
    random_seed=42
)

# Add noise to true state
noisy_trajectory = noise_model.apply(
    true_trajectory=ground_truth,
    true_dynamic_state=true_state
)

Logging

The runtime uses AlpaSim Simulation Log (ASL) format:
from alpasim_utils.logs import LogWriter

# Create log writer
log_writer = LogWriter(output_path="rollout.asl")

# Write entries during simulation
await log_writer.write_entry(
    timestamp_us=current_time,
    ego_pose=pose,
    actor_poses=traffic_poses,
    camera_images=images
)

# Close log
await log_writer.close()
Logs are written to the directory specified in the wizard configuration.

Example: Running a Rollout

import asyncio
from alpasim_runtime.loop import UnboundRollout
from alpasim_runtime.dispatcher import Dispatcher
from alpasim_runtime.config import (
    ScenarioConfig,
    UserSimulatorConfig,
    RuntimeCameraConfig
)

async def run_simulation():
    # Configure scenario
    scenario = ScenarioConfig(
        scene_id="waymo_sf_001",
        clip_start_time_us=0,
        clip_end_time_us=10_000_000,
        random_seed=42
    )
    
    # Configure camera
    camera = RuntimeCameraConfig(
        logical_id="camera_front_wide_120fov",
        height=720,
        width=1280,
        frame_interval_us=33000
    )
    
    # Create unbound rollout
    unbound = UnboundRollout.create(
        scenario_config=scenario,
        user_sim_config=user_config,
        camera_configs=[camera],
        vehicle_config=vehicle_config
    )
    
    # Create dispatcher
    dispatcher = Dispatcher.create(
        network_config=network_config,
        scene_artifacts=artifacts
    )
    
    # Bind and run
    bound = await unbound.bind(
        dispatcher=dispatcher,
        vector_map=map_data,
        traffic_objects=traffic
    )
    
    result = await bound.run()
    print(f"Metrics: {result.aggregated_metrics}")

asyncio.run(run_simulation())

Build docs developers (and LLMs) love