The alpasim_runtime module orchestrates the AlpaSim simulation. It manages communication between gRPC services, executes rollouts, and handles logging and evaluation.
Installation
The runtime is installed as part of the AlpaSim workspace:
# From repository root
./setup_local_env.sh
source .venv/bin/activate
The runtime module is located at src/runtime/alpasim_runtime/.
Core Classes
UnboundRollout
Represents rollout metadata and configuration before execution.
from alpasim_runtime.loop import UnboundRollout
from alpasim_runtime.config import ScenarioConfig
# Create from scenario configuration
unbound = UnboundRollout.create(
scenario_config = scenario_cfg,
user_sim_config = user_cfg,
camera_configs = cameras,
vehicle_config = vehicle_cfg
)
Scene ID, timestamps, and scenario metadata
user_sim_config
UserSimulatorConfig
required
Service endpoints and simulation parameters
camera_configs
list[RuntimeCameraConfig]
required
Camera specifications for rendering
Vehicle dimensions and physics parameters
BoundRollout
Executable rollout bound to gRPC service connections.
from alpasim_runtime.dispatcher import Dispatcher
# Create service dispatcher
dispatcher = Dispatcher.create(network_config, scene_artifacts)
# Bind rollout to services
bound = await unbound.bind(
dispatcher = dispatcher,
vector_map = map_data,
traffic_objects = traffic_data
)
# Execute simulation
result = await bound.run()
Methods:
Execute the rollout and return evaluation metrics
Dispatcher
Manages gRPC service connections and load balancing.
from alpasim_runtime.dispatcher import Dispatcher
from alpasim_runtime.config import NetworkSimulatorConfig
dispatcher = Dispatcher.create(
network_config = NetworkSimulatorConfig(
sensorsim = EndpointAddresses( addresses = [ "localhost:50051" ]),
driver = EndpointAddresses( addresses = [ "localhost:50052" ]),
physics = EndpointAddresses( addresses = [ "localhost:50053" ]),
trafficsim = EndpointAddresses( addresses = [ "localhost:50054" ]),
controller = EndpointAddresses( addresses = [ "localhost:50055" ])
),
scene_artifacts = artifacts
)
# Get service connections
sensorsim = await dispatcher.get_sensorsim_service()
driver = await dispatcher.get_driver_service()
controller = await dispatcher.get_controller_service()
physics = await dispatcher.get_physics_service()
traffic = await dispatcher.get_traffic_service()
Configuration
ScenarioConfig
Defines a single simulation scenario.
from alpasim_runtime.config import ScenarioConfig
scenario = ScenarioConfig(
scene_id = "waymo_sf_001" ,
clip_start_time_us = 1000000 ,
clip_end_time_us = 10000000 ,
random_seed = 42
)
Scene identifier (must match sensorsim/physics scenes)
Simulation start time (microseconds)
Simulation end time (microseconds)
RuntimeCameraConfig
Camera rendering configuration.
from alpasim_runtime.config import RuntimeCameraConfig
camera = RuntimeCameraConfig(
logical_id = "camera_front_wide_120fov" ,
height = 720 ,
width = 1280 ,
frame_interval_us = 33000 , # ~30fps
shutter_duration_us = 17000 ,
first_frame_offset_us = 0
)
Camera identifier (must match sensorsim available cameras)
Time between frames (microseconds)
Exposure duration (microseconds)
UserSimulatorConfig
Runtime behavior configuration.
from alpasim_runtime.config import (
UserSimulatorConfig,
PhysicsUpdateMode,
RouteGeneratorType
)
config = UserSimulatorConfig(
enable_rendering = True ,
enable_eval = True ,
physics_update_mode = PhysicsUpdateMode. ONCE_PER_STEP ,
route_generator_type = RouteGeneratorType. FROM_GROUND_TRUTH ,
egomotion_noise_model = None
)
Whether to render camera images
Whether to compute evaluation metrics
ONCE_PER_STEP: Single physics update per timestep
TWICE_PER_STEP: Update before and after controller
FROM_GROUND_TRUTH: Use recorded route
FROM_MAP: Generate from road network
Service Wrappers
The runtime provides typed wrappers for each gRPC service:
ControllerService
from alpasim_runtime.services.controller_service import ControllerService
controller = ControllerService(grpc_stub, session_uuid)
# Start session
await controller.start_session( rig_file = "config.yaml" )
# Propagate vehicle
result = await controller.run_controller_and_vehicle(
state = current_state,
planned_trajectory = desired_path,
future_time_us = next_timestamp,
coerce_dynamic_state = False
)
# Access results
pose = result.pose_local_to_rig # PoseAtTime
dynamic_state = result.dynamic_state # DynamicState
DriverService
from alpasim_runtime.services.driver_service import DriverService
driver = DriverService(grpc_stub, session_uuid)
await driver.start_session(
random_seed = 42 ,
available_cameras = cameras
)
# Submit observations
await driver.submit_image(camera_id, timestamp, image_bytes)
await driver.submit_egomotion(trajectory, dynamic_state)
await driver.submit_route(waypoints, timestamp)
# Get planned trajectory
trajectory = await driver.drive(
time_now_us = current_time,
time_query_us = query_time
)
SensorsimService
from alpasim_runtime.services.sensorsim_service import (
SensorsimService,
ImageFormat
)
sensorsim = SensorsimService(grpc_stub)
# Render camera
image_bytes = await sensorsim.render_rgb(
scene_id = "scene_001" ,
camera_spec = camera_config,
frame_start_us = t_start,
frame_end_us = t_end,
sensor_pose = camera_pose,
dynamic_objects = traffic,
image_format = ImageFormat. JPEG ,
quality = 0.95
)
# Render LiDAR
point_cloud = await sensorsim.render_lidar(
scene_id = "scene_001" ,
lidar_type = "PANDAR128" ,
frame_start_us = t_start,
frame_end_us = t_end,
sensor_pose = lidar_pose,
dynamic_objects = traffic
)
Utility Classes
RouteGenerator
Generates route waypoints for the driver.
from alpasim_runtime.route_generator import RouteGenerator
from alpasim_runtime.config import RouteGeneratorType
generator = RouteGenerator(
generator_type = RouteGeneratorType. FROM_GROUND_TRUTH ,
ground_truth_trajectory = recorded_path,
vector_map = map_data
)
waypoints = generator.get_route_at_time(
timestamp_us = current_time,
current_pose = ego_pose
)
EgomotionNoiseModel
Adds realistic sensor noise to localization.
from alpasim_runtime.noise_models import EgomotionNoiseModel
from alpasim_runtime.config import EgomotionNoiseModelConfig
noise_model = EgomotionNoiseModel(
config = EgomotionNoiseModelConfig(
position_stddev_m = 0.05 ,
rotation_stddev_rad = 0.01 ,
velocity_stddev_mps = 0.1
),
random_seed = 42
)
# Add noise to true state
noisy_trajectory = noise_model.apply(
true_trajectory = ground_truth,
true_dynamic_state = true_state
)
Logging
The runtime uses AlpaSim Simulation Log (ASL) format:
from alpasim_utils.logs import LogWriter
# Create log writer
log_writer = LogWriter( output_path = "rollout.asl" )
# Write entries during simulation
await log_writer.write_entry(
timestamp_us = current_time,
ego_pose = pose,
actor_poses = traffic_poses,
camera_images = images
)
# Close log
await log_writer.close()
Logs are written to the directory specified in the wizard configuration.
Example: Running a Rollout
Simple Rollout
Batch Processing
import asyncio
from alpasim_runtime.loop import UnboundRollout
from alpasim_runtime.dispatcher import Dispatcher
from alpasim_runtime.config import (
ScenarioConfig,
UserSimulatorConfig,
RuntimeCameraConfig
)
async def run_simulation ():
# Configure scenario
scenario = ScenarioConfig(
scene_id = "waymo_sf_001" ,
clip_start_time_us = 0 ,
clip_end_time_us = 10_000_000 ,
random_seed = 42
)
# Configure camera
camera = RuntimeCameraConfig(
logical_id = "camera_front_wide_120fov" ,
height = 720 ,
width = 1280 ,
frame_interval_us = 33000
)
# Create unbound rollout
unbound = UnboundRollout.create(
scenario_config = scenario,
user_sim_config = user_config,
camera_configs = [camera],
vehicle_config = vehicle_config
)
# Create dispatcher
dispatcher = Dispatcher.create(
network_config = network_config,
scene_artifacts = artifacts
)
# Bind and run
bound = await unbound.bind(
dispatcher = dispatcher,
vector_map = map_data,
traffic_objects = traffic
)
result = await bound.run()
print ( f "Metrics: { result.aggregated_metrics } " )
asyncio.run(run_simulation())