Skip to main content
Alpamayo R1 uses diffusion models, specifically flow matching, to generate diverse and realistic vehicle trajectories. This approach enables the model to capture multimodal distributions of future motion while maintaining high-quality samples.

Overview

Diffusion models gradually denoise random noise into structured data through a learned reverse process. Alpamayo R1 implements flow matching, a modern diffusion technique that offers:
  • Faster sampling: Straight paths in probability space reduce required steps
  • Training stability: Direct velocity field prediction avoids noise schedule tuning
  • Flexibility: Easy integration with conditional generation

Base Diffusion Interface

All diffusion models inherit from BaseDiffusion:
from alpamayo_r1.diffusion.base import BaseDiffusion, StepFn

class BaseDiffusion(ABC, nn.Module):
    def __init__(self, x_dims: list[int] | tuple[int] | int):
        """Initialize with output dimensions.
        
        Args:
            x_dims: Dimensions of the data to generate
        """
        super().__init__()
        self.x_dims = [x_dims] if isinstance(x_dims, int) else list(x_dims)
    
    @abstractmethod
    def sample(
        self,
        batch_size: int,
        step_fn: StepFn,
        device: torch.device = torch.device("cpu"),
        return_all_steps: bool = False,
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
        """Sample from the diffusion model.
        
        Args:
            batch_size: Number of samples to generate
            step_fn: Denoising function that predicts velocity field
            device: Device to run sampling on
            return_all_steps: Whether to return intermediate steps
            
        Returns:
            Final samples [B, *x_dims] or (all_steps [B, T, *x_dims], timesteps [T])
        """
See base.py:45-89 for the complete interface.

Step Function Protocol

The step_fn is a callable that denoises data at each timestep:
class StepFn(Protocol):
    def __call__(
        self,
        *,
        x: torch.Tensor,  # Noisy data at current timestep
        t: torch.Tensor,  # Current timestep in [0, 1]
    ) -> torch.Tensor:
        """Returns predicted velocity field."""
In practice, this is typically a neural network conditioned on observations (camera images, LiDAR, etc.).

Flow Matching

FlowMatching is the primary diffusion implementation in Alpamayo R1:
from alpamayo_r1.diffusion.flow_matching import FlowMatching

diffusion = FlowMatching(
    x_dims=[64, 2],           # Shape: (n_waypoints, action_dim)
    int_method="euler",       # Integration method
    num_inference_steps=10,   # Number of denoising steps
)

How Flow Matching Works

Flow matching learns to transform noise into data by predicting velocity fields along optimal transport paths:
  1. Training: Learn a velocity field v(x, t) that pushes noise toward data
    • Start: x₀ ~ N(0, I) (random noise)
    • End: x₁ ~ p_data (real trajectory)
    • Path: x_t = t·x₁ + (1-t)·x₀ for t ∈ [0, 1]
    • Objective: Predict v(x_t, t) = x₁ - x₀
  2. Sampling: Integrate the learned velocity field from noise to data
    • Initialize: x ~ N(0, I)
    • Evolve: dx/dt = v(x, t) for t: 0 → 1
    • Result: Realistic trajectory sample
This approach is based on:

Sampling with Euler Integration

The sample() method implements forward Euler integration:
# From flow_matching.py:89-127
def _euler(
    self,
    batch_size: int,
    step_fn: StepFn,
    device: torch.device,
    inference_step: int,
) -> torch.Tensor:
    # Start from random noise
    x = torch.randn(batch_size, *self.x_dims, device=device)
    
    # Time steps from 0 to 1
    time_steps = torch.linspace(0.0, 1.0, inference_step + 1, device=device)
    
    # Integrate velocity field
    for i in range(inference_step):
        dt = time_steps[i + 1] - time_steps[i]
        t_start = time_steps[i]
        
        # Predict velocity at current position and time
        v = step_fn(x=x, t=t_start)
        
        # Euler step: x_new = x_old + dt * v
        x = x + dt * v
    
    return x

Usage Example

Here’s a complete example of sampling trajectories:
import torch
from alpamayo_r1.diffusion.flow_matching import FlowMatching
from alpamayo_r1.action_space.unicycle_accel_curvature import (
    UnicycleAccelCurvatureActionSpace
)

# Initialize diffusion model
diffusion = FlowMatching(
    x_dims=[64, 2],  # 64 waypoints, 2D actions (accel, curvature)
    num_inference_steps=10,
)

# Initialize action space
action_space = UnicycleAccelCurvatureActionSpace(
    n_waypoints=64,
    dt=0.1,
)

# Define step function (typically a trained neural network)
def step_fn(x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
    # x: (batch_size, 64, 2) - current noisy actions
    # t: (batch_size, 1, 1) - current timestep
    # Returns: (batch_size, 64, 2) - predicted velocity field
    
    # In practice, this would be:
    # return trained_model(observations, x, t)
    
    # Placeholder for demonstration
    return model(x, t, conditioning_data)

# Sample trajectories
sampled_actions = diffusion.sample(
    batch_size=32,
    step_fn=step_fn,
    device=torch.device("cuda"),
)

# Convert actions to trajectories
future_xyz, future_rot = action_space.action_to_traj(
    sampled_actions,
    history_xyz,
    history_rot,
)

Configuration Options

ParameterTypeDefaultDescription
x_dimslist[int]RequiredDimensions of output data
int_methodstr”euler”Integration method (currently only “euler”)
num_inference_stepsint10Number of denoising iterations
These can be overridden at sampling time:
samples = diffusion.sample(
    batch_size=16,
    step_fn=step_fn,
    inference_step=20,  # Override default num_inference_steps
    int_method="euler", # Override default int_method
)

Returning Intermediate Steps

For visualization or analysis, you can retrieve all intermediate denoising steps:
all_steps, time_steps = diffusion.sample(
    batch_size=1,
    step_fn=step_fn,
    device=device,
    return_all_steps=True,
)

# all_steps: (1, num_inference_steps+1, 64, 2)
# time_steps: (num_inference_steps+1,) in range [0, 1]

# Visualize denoising process
for i, (step, t) in enumerate(zip(all_steps[0], time_steps)):
    print(f"Step {i} at t={t.item():.2f}")
    # step: (64, 2) - actions at this denoising step

Training Flow Matching Models

While the sampling code is shown above, training typically follows this pattern:
# Training loop (conceptual)
for batch in dataloader:
    # Get ground truth trajectories
    gt_actions = action_space.traj_to_action(
        hist_xyz, hist_rot,
        fut_xyz, fut_rot,
    )
    
    # Sample random noise and timesteps
    noise = torch.randn_like(gt_actions)
    t = torch.rand(batch_size, 1, 1)
    
    # Create noisy interpolation: x_t = t*data + (1-t)*noise
    x_t = t * gt_actions + (1 - t) * noise
    
    # Target velocity is the difference
    target_v = gt_actions - noise
    
    # Predict velocity field
    pred_v = model(observations, x_t, t)
    
    # Loss: MSE between predicted and target velocity
    loss = F.mse_loss(pred_v, target_v)
    loss.backward()
The key insight: the target velocity v = x₁ - x₀ is simply the direction from noise to data.

Inference Speed Considerations

Flow matching enables faster sampling than traditional diffusion models:
Inference StepsLatency (approx)Quality
1~10msLow (single-step approximation)
5~50msMedium (good for real-time)
10~100msHigh (recommended default)
20+~200ms+Very high (diminishing returns)
For autonomous driving, 5-10 steps typically provide the best speed/quality tradeoff.

Advanced: Conditional Generation

Flow matching naturally supports conditional generation by including observations in the step function:
def conditional_step_fn(x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
    # Combine noisy actions with observations
    features = encoder(
        camera_images=images,      # Visual context
        lidar_points=lidar,        # 3D scene geometry
        history_traj=hist_xyz,     # Past motion
        map_features=map_data,     # HD map
    )
    
    # Predict velocity conditioned on all context
    velocity = denoiser(
        x=x,                       # Current noisy actions
        t=t,                       # Timestep
        context=features,          # Conditioning information
    )
    
    return velocity

# Sample trajectories conditioned on observations
actions = diffusion.sample(
    batch_size=32,
    step_fn=conditional_step_fn,
)
This conditioning allows the model to generate context-aware, scene-appropriate trajectories.

Best Practices

  1. Start with 10 inference steps: Good balance of speed and quality
  2. Use FP16/BF16: Mixed precision can speed up sampling 2x with minimal quality loss
  3. Batch inference: Process multiple samples in parallel for efficiency
  4. Cache features: If generating multiple samples for the same scene, encode observations once
  5. Compile models: Use torch.compile() for faster step function execution

Comparison to Other Diffusion Methods

MethodTrainingSampling SpeedImplementation Complexity
DDPMStableSlow (100+ steps)Medium
DDIMStableMedium (20-50 steps)Medium
Flow MatchingVery stableFast (5-10 steps)Low
Flow matching’s straight paths in probability space (rather than noisy random walks) enable fewer sampling steps while maintaining quality.

References

See the source code at diffusion/base.py and diffusion/flow_matching.py for full implementation details.

Build docs developers (and LLMs) love