Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/verl-project/verl/llms.txt

Use this file to discover all available pages before exploring further.

PPORayTrainer is the heart of verl’s distributed RL training. It runs as a single driver process on a CPU node (CPU by default), and orchestrates all the heavy lifting — data loading, GPU resource allocation, and the full PPO training loop — by issuing remote procedure calls to WorkerGroup instances that run on the actual GPU workers. The driver process itself never touches GPU memory; it only coordinates.

Three Core Responsibilities

PPORayTrainer is organized around three distinct concerns: preparing data, initializing distributed worker groups, and running the iterative RL training loop.
1
Data Preparation
2
The trainer is responsible for loading a complete batch of prompts from the dataset and dispatching them to the appropriate GPU worker groups. verl implements the RLHFDataset class to handle the end-to-end preprocessing pipeline: it reads preprocessed Parquet files, applies chat templates, adds padding, truncates prompts that exceed the maximum prompt length, and tokenizes the result.
3
self.train_dataset = RLHFDataset(
    data_files=self.config.data.train_files,
    tokenizer=self.tokenizer,
    config=self.config.data,
)
4
The resulting DataLoader iterates over the dataset under the configured PPO mini-batch size. Each item yielded by the dataloader is a DataProto — verl’s unified data container that moves seamlessly between the driver and the remote GPU workers.
5
WorkerGroup Initialization
6
Before training begins, the trainer allocates GPU resources using RayResourcePool and wires each model role (actor, rollout, reference policy, critic, reward model) to a set of Ray actors. The max_colocate_count parameter is central here: it controls how many WorkerGroup processes share the same set of GPUs.
7
# max_colocate_count=1 merges all WorkerGroups into one process per GPU (recommended for FSDP).
# max_colocate_count>1 keeps WorkerGroups in separate processes (useful for Megatron,
# where each role may have a different 3D-parallel topology).
resource_pool = RayResourcePool(
    process_on_nodes=[config.trainer.n_gpus_per_node] * config.trainer.nnodes,
    use_gpu=True,
    max_colocate_count=1,
)

# Wrap the worker class with its constructor arguments
actor_rollout_cls = RayClassWithInitArgs(cls=ActorRolloutWorker)

# Create the worker group
actor_rollout_worker_group = RayWorkerGroup(
    resource_pool=resource_pool,
    ray_cls_with_init=actor_rollout_cls,
)
8
For the common colocated case where the actor, rollout engine, and reference policy all share the same GPUs, verl provides create_colocated_worker_cls to merge multiple roles into a single process and avoid redundant CUDA/distributed context overhead:
9
all_wg = {}
for resource_pool, class_dict in self.resource_pool_to_cls.items():
    worker_dict_cls = create_colocated_worker_cls(class_dict=class_dict)
    wg_dict = self.ray_worker_group_cls(
        resource_pool=resource_pool,
        ray_cls_with_init=worker_dict_cls,
    )
    spawn_wg = wg_dict.spawn(prefix_set=class_dict.keys())
    all_wg.update(spawn_wg)

if self.use_critic:
    self.critic_wg = all_wg["critic"]
    self.critic_wg.init_model()

if self.use_reference_policy:
    self.ref_policy_wg = all_wg["ref"]
    self.ref_policy_wg.init_model()

if self.use_rm:
    self.rm_wg = all_wg["rm"]
    self.rm_wg.init_model()

# Create rollout last — so vLLM can estimate available KV cache memory accurately
self.actor_rollout_wg = all_wg["actor_rollout"]
self.actor_rollout_wg.init_model()
10
For the Megatron backend, merging WorkerGroups into the same process means all roles share the same 3D-parallel size. If you need different tensor/pipeline/expert parallel sizes per role, initialize each role’s WorkerGroup against a separate RayResourcePool without using create_colocated_worker_cls.
11
PPO Training Loop
12
Once workers are initialized, the driver runs the RL training loop. Each step issues RPC calls to the appropriate worker groups and combines the results to compute advantages on the driver CPU. All inputs and outputs are DataProto objects.
13
def fit(self):
    for epoch in range(self.config.trainer.total_epochs):
        for batch_dict in self.train_dataloader:
            metrics = {}
            batch: DataProto = DataProto.from_single_dict(batch_dict)

            # 1. Extract prompt tokens and generate rollouts
            gen_batch = batch.pop(batch_keys=["input_ids", "attention_mask", "position_ids"])
            gen_batch_output = self.actor_rollout_wg.generate_sequences(gen_batch)
            batch = batch.union(gen_batch_output)

            # 2. Compute reference log-probabilities (for KL penalty)
            if self.use_reference_policy:
                ref_log_prob = self.ref_policy_wg.compute_ref_log_prob(batch)
                batch = batch.union(ref_log_prob)

            # 3. Compute critic values
            values = self.critic_wg.compute_values(batch)
            batch = batch.union(values)

            # 4. Score with reward model and/or rule-based reward function
            if self.use_rm:
                reward_tensor = self.rm_wg.compute_rm_score(batch)
                batch = batch.union(reward_tensor)
            reward_tensor = self.reward_fn(batch)
            batch.batch["token_level_scores"] = reward_tensor

            # 5. Apply KL penalty and compute advantages (on driver CPU)
            batch, kl_metrics = apply_kl_penalty(batch, kl_ctrl=self.kl_ctrl_in_reward,
                                                  kl_penalty=self.config.algorithm.kl_penalty)
            batch = compute_advantage(batch,
                                      self.config.algorithm.gamma,
                                      self.config.algorithm.lam,
                                      adv_estimator=self.config.algorithm.adv_estimator)

            # 6. Update critic
            if self.use_critic:
                critic_output = self.critic_wg.update_critic(batch)

            # 7. Update actor (after optional critic warm-up)
            if self.config.trainer.critic_warmup <= global_steps:
                actor_output = self.actor_rollout_wg.update_actor(batch)

            global_steps += 1

Resource Pools and GPU Placement

RayResourcePool is the abstraction that maps logical worker groups onto physical GPU resources. The two most important parameters are process_on_nodes (a list where each entry gives the number of GPUs on that node) and max_colocate_count.
With max_colocate_count=1, all WorkerGroups on a given node share the same Ray placement group bundle. Actor, rollout, and reference policy all live in one process per GPU — no extra CUDA context, no duplicated device memory for distributed state.
trainer:
  n_gpus_per_node: 8
  nnodes: 1
resource_pool = RayResourcePool(
    process_on_nodes=[8],  # 8 GPUs on 1 node
    use_gpu=True,
    max_colocate_count=1,
)

Extending to Other RL Algorithms

The PPORayTrainer training loop is a concrete implementation of the HybridFlow dataflow. To extend it for algorithms such as GRPO or DPO, the standard approach is to subclass PPORayTrainer and override fit(), replacing or removing steps (for example, GRPO removes the critic entirely and replaces per-token value estimation with group-relative advantage). Refer to the extending algorithms guide for a worked example.

Engine Workers

Learn about ActorRolloutRefWorker and TrainingWorker — the distributed worker classes that PPORayTrainer orchestrates.

Model Engine

Understand the FSDP, FSDP2, and Megatron-LM backends that power each worker.

Build docs developers (and LLMs) love