Documentation Index
Fetch the complete documentation index at: https://mintlify.com/NVlabs/alpamayo/llms.txt
Use this file to discover all available pages before exploring further.
Alpamayo R1 uses diffusion models, specifically flow matching, to generate diverse and realistic vehicle trajectories. This approach enables the model to capture multimodal distributions of future motion while maintaining high-quality samples.
Overview
Diffusion models gradually denoise random noise into structured data through a learned reverse process. Alpamayo R1 implements flow matching, a modern diffusion technique that offers:
- Faster sampling: Straight paths in probability space reduce required steps
- Training stability: Direct velocity field prediction avoids noise schedule tuning
- Flexibility: Easy integration with conditional generation
Base Diffusion Interface
All diffusion models inherit from BaseDiffusion:
from alpamayo_r1.diffusion.base import BaseDiffusion, StepFn
class BaseDiffusion(ABC, nn.Module):
def __init__(self, x_dims: list[int] | tuple[int] | int):
"""Initialize with output dimensions.
Args:
x_dims: Dimensions of the data to generate
"""
super().__init__()
self.x_dims = [x_dims] if isinstance(x_dims, int) else list(x_dims)
@abstractmethod
def sample(
self,
batch_size: int,
step_fn: StepFn,
device: torch.device = torch.device("cpu"),
return_all_steps: bool = False,
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
"""Sample from the diffusion model.
Args:
batch_size: Number of samples to generate
step_fn: Denoising function that predicts velocity field
device: Device to run sampling on
return_all_steps: Whether to return intermediate steps
Returns:
Final samples [B, *x_dims] or (all_steps [B, T, *x_dims], timesteps [T])
"""
See base.py:45-89 for the complete interface.
Step Function Protocol
The step_fn is a callable that denoises data at each timestep:
class StepFn(Protocol):
def __call__(
self,
*,
x: torch.Tensor, # Noisy data at current timestep
t: torch.Tensor, # Current timestep in [0, 1]
) -> torch.Tensor:
"""Returns predicted velocity field."""
In practice, this is typically a neural network conditioned on observations (camera images, LiDAR, etc.).
Flow Matching
FlowMatching is the primary diffusion implementation in Alpamayo R1:
from alpamayo_r1.diffusion.flow_matching import FlowMatching
diffusion = FlowMatching(
x_dims=[64, 2], # Shape: (n_waypoints, action_dim)
int_method="euler", # Integration method
num_inference_steps=10, # Number of denoising steps
)
How Flow Matching Works
Flow matching learns to transform noise into data by predicting velocity fields along optimal transport paths:
-
Training: Learn a velocity field
v(x, t) that pushes noise toward data
- Start:
x₀ ~ N(0, I) (random noise)
- End:
x₁ ~ p_data (real trajectory)
- Path:
x_t = t·x₁ + (1-t)·x₀ for t ∈ [0, 1]
- Objective: Predict
v(x_t, t) = x₁ - x₀
-
Sampling: Integrate the learned velocity field from noise to data
- Initialize:
x ~ N(0, I)
- Evolve:
dx/dt = v(x, t) for t: 0 → 1
- Result: Realistic trajectory sample
This approach is based on:
Sampling with Euler Integration
The sample() method implements forward Euler integration:
# From flow_matching.py:89-127
def _euler(
self,
batch_size: int,
step_fn: StepFn,
device: torch.device,
inference_step: int,
) -> torch.Tensor:
# Start from random noise
x = torch.randn(batch_size, *self.x_dims, device=device)
# Time steps from 0 to 1
time_steps = torch.linspace(0.0, 1.0, inference_step + 1, device=device)
# Integrate velocity field
for i in range(inference_step):
dt = time_steps[i + 1] - time_steps[i]
t_start = time_steps[i]
# Predict velocity at current position and time
v = step_fn(x=x, t=t_start)
# Euler step: x_new = x_old + dt * v
x = x + dt * v
return x
Usage Example
Here’s a complete example of sampling trajectories:
import torch
from alpamayo_r1.diffusion.flow_matching import FlowMatching
from alpamayo_r1.action_space.unicycle_accel_curvature import (
UnicycleAccelCurvatureActionSpace
)
# Initialize diffusion model
diffusion = FlowMatching(
x_dims=[64, 2], # 64 waypoints, 2D actions (accel, curvature)
num_inference_steps=10,
)
# Initialize action space
action_space = UnicycleAccelCurvatureActionSpace(
n_waypoints=64,
dt=0.1,
)
# Define step function (typically a trained neural network)
def step_fn(x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
# x: (batch_size, 64, 2) - current noisy actions
# t: (batch_size, 1, 1) - current timestep
# Returns: (batch_size, 64, 2) - predicted velocity field
# In practice, this would be:
# return trained_model(observations, x, t)
# Placeholder for demonstration
return model(x, t, conditioning_data)
# Sample trajectories
sampled_actions = diffusion.sample(
batch_size=32,
step_fn=step_fn,
device=torch.device("cuda"),
)
# Convert actions to trajectories
future_xyz, future_rot = action_space.action_to_traj(
sampled_actions,
history_xyz,
history_rot,
)
Configuration Options
| Parameter | Type | Default | Description |
|---|
x_dims | list[int] | Required | Dimensions of output data |
int_method | str | ”euler” | Integration method (currently only “euler”) |
num_inference_steps | int | 10 | Number of denoising iterations |
These can be overridden at sampling time:
samples = diffusion.sample(
batch_size=16,
step_fn=step_fn,
inference_step=20, # Override default num_inference_steps
int_method="euler", # Override default int_method
)
For visualization or analysis, you can retrieve all intermediate denoising steps:
all_steps, time_steps = diffusion.sample(
batch_size=1,
step_fn=step_fn,
device=device,
return_all_steps=True,
)
# all_steps: (1, num_inference_steps+1, 64, 2)
# time_steps: (num_inference_steps+1,) in range [0, 1]
# Visualize denoising process
for i, (step, t) in enumerate(zip(all_steps[0], time_steps)):
print(f"Step {i} at t={t.item():.2f}")
# step: (64, 2) - actions at this denoising step
Training Flow Matching Models
While the sampling code is shown above, training typically follows this pattern:
# Training loop (conceptual)
for batch in dataloader:
# Get ground truth trajectories
gt_actions = action_space.traj_to_action(
hist_xyz, hist_rot,
fut_xyz, fut_rot,
)
# Sample random noise and timesteps
noise = torch.randn_like(gt_actions)
t = torch.rand(batch_size, 1, 1)
# Create noisy interpolation: x_t = t*data + (1-t)*noise
x_t = t * gt_actions + (1 - t) * noise
# Target velocity is the difference
target_v = gt_actions - noise
# Predict velocity field
pred_v = model(observations, x_t, t)
# Loss: MSE between predicted and target velocity
loss = F.mse_loss(pred_v, target_v)
loss.backward()
The key insight: the target velocity v = x₁ - x₀ is simply the direction from noise to data.
Inference Speed Considerations
Flow matching enables faster sampling than traditional diffusion models:
| Inference Steps | Latency (approx) | Quality |
|---|
| 1 | ~10ms | Low (single-step approximation) |
| 5 | ~50ms | Medium (good for real-time) |
| 10 | ~100ms | High (recommended default) |
| 20+ | ~200ms+ | Very high (diminishing returns) |
For autonomous driving, 5-10 steps typically provide the best speed/quality tradeoff.
Advanced: Conditional Generation
Flow matching naturally supports conditional generation by including observations in the step function:
def conditional_step_fn(x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
# Combine noisy actions with observations
features = encoder(
camera_images=images, # Visual context
lidar_points=lidar, # 3D scene geometry
history_traj=hist_xyz, # Past motion
map_features=map_data, # HD map
)
# Predict velocity conditioned on all context
velocity = denoiser(
x=x, # Current noisy actions
t=t, # Timestep
context=features, # Conditioning information
)
return velocity
# Sample trajectories conditioned on observations
actions = diffusion.sample(
batch_size=32,
step_fn=conditional_step_fn,
)
This conditioning allows the model to generate context-aware, scene-appropriate trajectories.
Best Practices
- Start with 10 inference steps: Good balance of speed and quality
- Use FP16/BF16: Mixed precision can speed up sampling 2x with minimal quality loss
- Batch inference: Process multiple samples in parallel for efficiency
- Cache features: If generating multiple samples for the same scene, encode observations once
- Compile models: Use
torch.compile() for faster step function execution
Comparison to Other Diffusion Methods
| Method | Training | Sampling Speed | Implementation Complexity |
|---|
| DDPM | Stable | Slow (100+ steps) | Medium |
| DDIM | Stable | Medium (20-50 steps) | Medium |
| Flow Matching | Very stable | Fast (5-10 steps) | Low |
Flow matching’s straight paths in probability space (rather than noisy random walks) enable fewer sampling steps while maintaining quality.
References
See the source code at diffusion/base.py and diffusion/flow_matching.py for full implementation details.