verl’s architecture separates the single-process control flow from multi-process distributed computation, which makes it straightforward to implement new RL algorithms without touching the core framework. The controller (driver) process orchestrates distributed worker groups over Ray, while each worker group executes Single-Process-Multiple-Data (SPMD) computations across GPUs. This page explains the core extension pattern, walks through an online DPO implementation, and documents how to add new model support for the FSDP and Megatron-LM backends.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/verl-project/verl/llms.txt
Use this file to discover all available pages before exploring further.
Key idea: A single controller process drives multi-process computation and data communication. All GPU work happens inside worker classes that expose plain Python method calls to the controller — the framework handles data dispatch and collection transparently.
Overall Approach
Extending verl to a new algorithm follows three steps that build on each other:For each model role in your algorithm, enumerate the SPMD operations it must perform. For online DPO these are:
generate_sequencescompute_rewardinfer (compute log prob)updateEach operation becomes a method on a
Worker subclass. Every method decorated with @register is callable from the controller with full data dispatch and collection handled automatically.Each worker class is a
@ray.remote class that inherits from Worker. Workers run the distributed computation — they are the “computation processes” that the controller drives.from verl.single_controller.base import Worker
from verl.single_controller.base.decorator import register, Dispatch
import ray
@ray.remote
class SampleGenerator(Worker):
def __init__(self, config):
super().__init__()
self.config = config
@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)
def generate_sequences(self, data):
# data is a DataProto shard — each worker processes its DP slice
pass
@ray.remote
class ReferencePolicy(Worker):
def __init__(self):
super().__init__()
self.model = Model()
@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)
def infer(self, data):
return self.model(data)
@ray.remote
class DPOActor(Worker):
def __init__(self):
super().__init__()
self.model = Model()
self.model = FSDP(self.model)
self.optimizer = optim.Adam(self.model.parameters(), lr=1e-3)
self.loss_fn = dpo_loss
@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)
def update(self, data):
self.optimizer.zero_grad()
logits = self.model(data)
loss = self.loss_fn(logits)
loss.backward()
self.optimizer.step()
The
Dispatch.DP_COMPUTE_PROTO mode requires DataProto as the data interface (defined in protocol.py). It automatically shards data by the DP dimension before dispatch and concatenates results on collection. Pre-built dispatch modes are defined in decorator.py.from verl.single_controller.base.decorator import register
def dispatch_data(worker_group, data):
return data.chunk(worker_group.world_size)
def collect_data(worker_group, data):
return torch.cat(data)
dispatch_mode = {
'dispatch_fn': dispatch_data,
'collect_fn': collect_data
}
@register(dispatch_mode=dispatch_mode)
def generate_sequences(self, data):
pass
Initialize
RayWorkerGroup objects for each worker class in the controller process, then call worker methods as if they were local function calls. Under the hood, data is split, sent to remote workers, computed in parallel, and results are gathered and merged.from verl.single_controller.ray import (
RayWorkerGroup,
RayClassWithInitArgs,
RayResourcePool,
)
import ray
@ray.remote(num_cpus=1)
def main_task(config):
# 16 GPUs across 2 nodes
resource_pool = RayResourcePool(process_on_nodes=[8, 8])
# Construct worker groups — each wraps a fleet of Ray actors
ray_cls = RayClassWithInitArgs(SampleGenerator, config=config)
sample_gen = RayWorkerGroup(resource_pool, ray_cls)
ray_cls = RayClassWithInitArgs(ReferencePolicy)
ref_policy = RayWorkerGroup(resource_pool, ray_cls)
ray_cls = RayClassWithInitArgs(DPOActor)
dpo_policy = RayWorkerGroup(resource_pool, ray_cls)
dataloader = DataLoader()
for data in dataloader:
# Each line calls remote distributed computation transparently
data = sample_gen.generate_sequences(data) # N responses per prompt
data = generate_scores(data) # score responses
data = generate_pairwise_data(data) # form preference pairs
data.batch['ref_log_prob'] = ref_policy.infer(data)
dpo_policy.update(data)
Different
WorkerGroup objects can share the same resource_pool (colocated) or use separate pools. See ray_trainer.py for how the PPO trainer uses create_colocated_worker_cls for colocated workers.Adding a New Model to the FSDP Backend
The FSDP backend can work with any HuggingFace model. However, the defaulthf_weight_loader gathers the full model state dict during weight synchronization to vLLM, which can cause OOM for large models. The preferred approach is dtensor_weight_loader, which gathers weights layer-by-layer.
Supported models with dtensor weight loader (in third_party/vllm/dtensor_weight_loaders.py):
GPT2LMHeadModel,LlamaForCausalLM,LLaMAForCausalLMMistralForCausalLM,InternLMForCausalLMAquilaModel,AquilaForCausalLMPhi3ForCausalLM,GemmaForCausalLM,Gemma2ForCausalLMGPTBigCodeForCausalLM,Starcoder2ForCausalLMQwen2ForCausalLM,DeepseekV2ForCausalLM
Open the vLLM model file for your architecture and copy the
load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]) method into dtensor_weight_loaders.py.# Before (vLLM style)
def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]):
# After (dtensor loader style)
def gemma_dtensor_weight_loader(actor_weights: Dict, vllm_model: nn.Module) -> nn.Module:
Replace
self with vllm_model throughout, and change for name, loaded_weight in weights: to for name, loaded_weight in actor_weights.items():.Before each weight load call, insert
redistribute_dtensor to handle the DTensor-to-local-tensor conversion:+ local_loaded_weight = redistribute_dtensor(param_name=name, loaded_weights=loaded_weight)
param = params_dict[name]
weight_loader = param.weight_loader
- weight_loader(param, loaded_weight, shard_id)
+ weight_loader(param, local_loaded_weight.to(dtype=param.dtype), shard_id)
Add your function to
__MODEL_DTENSOR_WEIGHT_LOADER_REGISTRY__ at the bottom of dtensor_weight_loaders.py:Adding a New Model to the Megatron-LM Backend
The Megatron-LM backend uses MCore’sGPTModel as its base. If your model’s layers are compatible with TransformerLayerSpec, you can often configure it directly.
If your model is configurable via
TransformerLayerSpec, use GPTModel directly. Otherwise implement a custom ModelLayerSpec and ModelLayer for your architecture.Use the correct
LayerSpec, TransformerConfig, and HuggingfaceConfig as arguments to initialize GPTModel, then return the model.The Megatron backend uses mbridge for bidirectional weight conversion between HuggingFace checkpoints and MCore format. Enable it with:
Reusing PPO Infrastructure
You do not need to implement everything from scratch. The PPO algorithm already provides complete FSDP and Megatron-LM backends inverl/trainer/ppo/. For a new algorithm you can:
- Reuse
vllm_rollout.pyfor generation (theSampleGeneratorlogic) - Reuse
fsdp_workers.pyfor FSDP-based actor, reference policy, and critic computation - Reuse
megatron_workers.pyfor Megatron-LM-based equivalents
main_task) and loss function need to be written from scratch for a new algorithm.