Scaling reinforcement learning beyond a single machine requires distributing the environment-policy interaction loop across multiple workers, nodes, or cloud VMs. TorchRL ships four distributed collection strategies that all present the same iterator interface as single-process collectors:Documentation Index
Fetch the complete documentation index at: https://mintlify.com/pytorch/rl/llms.txt
Use this file to discover all available pages before exploring further.
MultiSyncCollector and MultiAsyncCollector for multi-process collection on one machine, and DistributedCollector (torch.distributed backend) and RayCollector (Ray backend) for true multi-node distribution. All of them inherit from BaseCollector, so switching from a local Collector to a distributed one requires only changing the constructor — the training loop stays identical.
All multi-process collector classes must be created inside a
if __name__ == "__main__": guard on Windows and macOS. See the Python multiprocessing docs for background.MultiSyncCollector
MultiSyncCollector (from torchrl.collectors) runs a configurable number of Collector workers on separate processes synchronously: it waits for every worker to finish its rollout before assembling the result and yielding it to the caller. This guarantees that each yielded batch contains exactly frames_per_batch frames and that all workers use the same policy version, making it ideal for on-policy algorithms.
Constructor
Key Parameters
A list of callables each returning an
EnvBase instance — one per worker process. All callables can be the same lambda for identical environments. Passing a single callable creates one worker.Policy executed at each step on every worker. Must accept a
TensorDictBase. If None, a RandomPolicy is used. Mutually exclusive with policy_factory.Total frames per yielded batch across all workers. Each worker collects
ceil(frames_per_batch / num_workers) frames, and the results are assembled (stacked or concatenated) by the main process.Total frames over the collector’s lifetime. Pass
-1 for endless collection. The iterator stops once the accumulated frame count reaches or exceeds this value.How to combine results from multiple workers.
"stack" creates a leading worker dimension; "cat" concatenates along the batch dimension. Use "cat" when workers collect different environments.If
True, update_policy_weights_() is called automatically before every collection cycle. Convenient for on-policy algorithms that always want fresh weights.Fraction (0 to 1) of workers that must finish before stragglers are interrupted. Values below 1.0 reduce tail latency at the cost of slightly fewer frames from slow workers.
Usage
MultiAsyncCollector
MultiAsyncCollector (from torchrl.collectors) also fans out collection to multiple worker processes, but unlike MultiSyncCollector it does not wait for all workers to finish. Workers deliver rollouts on a first-ready basis, and the main process yields each one as it arrives. This maximizes hardware utilization and is ideal for off-policy algorithms where data staleness is acceptable.
Constructor
MultiAsyncCollector shares the same constructor signature as MultiSyncCollector. The key behavioral difference is the non-blocking collection loop: workers run continuously in parallel and the main process retrieves batches as they arrive.
DistributedCollector
DistributedCollector (from torchrl.collectors.distributed) scales collection across multiple machines using the torch.distributed backend (gloo, nccl, mpi, or ucc). Each remote node runs a Collector, MultiSyncCollector, or MultiAsyncCollector instance determined by the collector_class argument. The main process coordinates nodes through a TCPStore and assembles the results.
Constructor
Key Parameters
One callable per remote node; each callable returns an
EnvBase. For collector_class=MultiSyncCollector, each node creates num_workers_per_collector sub-processes internally.Frames collected per remote node per iteration. With
sync=True, the main process assembles all node results into one batch of size frames_per_batch * num_nodes.Class instantiated on each remote node. Accepts
Collector, MultiSyncCollector, MultiAsyncCollector, or the shorthand strings "single", "sync", "async".When
collector_class is a multi-process class (MultiSyncCollector / MultiAsyncCollector), this sets the number of sub-workers on each remote node.If
True, the main process waits for all nodes and yields their combined result as a single stacked TensorDict. If False, each node’s result is yielded as it arrives (first-ready, first-served).torch.distributed backend for weight synchronization. One of "gloo", "mpi", "nccl", or "ucc". Use "nccl" for GPU-to-GPU transfers.How remote processes are launched.
"submitit" submits SLURM jobs (requires the submitit package) and supports multi-node clusters. "mp" uses Python multiprocessing on a single machine. "submitit_delayed" defers launch for clusters that forbid spawning from existing jobs.If
True, automatically pushes updated policy weights to all (sync) or the contributing (async) remote nodes after each collected batch.Dictionary mapping model identifiers (e.g.
"policy") to WeightSyncScheme instances that control how weights flow from the main process to remote nodes. Defaults to DistributedWeightSyncScheme.Usage
DistributedCollector with launcher="submitit" requires the submitit package (pip install submitit) and a running SLURM scheduler. For single-machine multi-node simulation, launcher="mp" is sufficient.RPCCollector
RPCCollector (from torchrl.collectors.distributed) uses PyTorch’s RPC framework (torch.distributed.rpc) to coordinate remote workers. Unlike DistributedCollector, which uses the TCPStore + scatter pattern, RPCCollector makes direct RPC calls on TensorDictModule remote references, enabling fine-grained remote method invocation and tight integration with TorchRL’s @accept_remote_rref_udf_invocation decorator.
Constructor
Key Parameters
A list of device identifiers (one per remote node) indicating which device is used to transfer data back to the main process. Needed when mixing CPU and GPU nodes.
Keyword arguments forwarded to
torch.distributed.rpc.TensorPipeRpcBackendOptions. Use this to tune transport-level settings such as num_worker_threads.torch.distributed backend used for weight synchronization alongside RPC. Usually "gloo" (CPU) or "nccl" (GPU).Setup requirements
RPC initialization requires a process group to be set up beforeRPCCollector is created. The launcher handles this automatically, but for manual setups:
RayCollector
RayCollector (from torchrl.collectors.distributed) uses Ray to distribute collection across a Ray cluster. Each remote collector runs inside a Ray actor, and RayCollector coordinates them with synchronous or asynchronous scheduling. Compared to DistributedCollector, RayCollector requires no manual process-group initialization and supports heterogeneous clusters where workers have different resource requirements.
Constructor
Key Parameters
List of callables, each creating an
EnvBase instance. Length determines the number of remote collector actors unless num_collectors is provided.Explicit number of remote collector actors to create. When set,
create_env_fn, collector_kwargs, and remote_configs are broadcast to all num_collectors actors if they are not already lists.If
True, all actors must finish their rollout before the main process assembles and yields the combined TensorDict. If False, each actor’s result is yielded first-ready.Kwargs forwarded to
ray.init(). If None, Ray auto-detects an existing cluster or starts a local one. Set address="auto" to connect to an existing cluster.Resource specifications for
ray.remote() — controls CPU, GPU, and memory per actor. Defaults to {"num_cpus": 1, "num_gpus": 0.2, "memory": 2 * 1024**3}. A single dict is broadcast to all actors; a list assigns per-actor resources.When provided, remote actors write directly to this
RayReplayBuffer instead of returning data to the main process. Must be a RayReplayBuffer — regular ReplayBuffer instances cannot be shared across Ray actor boundaries.Mapping from model IDs to
WeightSyncScheme instances for pushing weights to remote actors. Defaults to {"policy": RayWeightSyncScheme()}.Usage
Choosing a Collector
- Single machine
- Multi-machine
| Scenario | Recommended Collector |
|---|---|
| On-policy, single env | Collector |
| On-policy, multiple envs | MultiSyncCollector |
| Off-policy, multiple envs | MultiAsyncCollector |
| Off-policy, async fill of replay buffer | AsyncCollector or MultiAsyncCollector |
Weight Update Strategies
All distributed collectors push fresh policy weights to remote workers throughupdate_policy_weights_(). The underlying mechanism is controlled by the weight_sync_schemes argument.
VanillaWeightUpdater
The simplest updater — performs an in-place weight copy on each worker’s policy. Suitable when workers share memory (e.g.,MultiSyncCollector on one machine). The constructor requires policy_weights, a locked TensorDict of the policy’s parameters. Use the from_policy classmethod for a convenient one-liner.
DistributedWeightSyncScheme (torch.distributed)
Used byDistributedCollector by default. Sends weights over the torch.distributed process group using isend/irecv for non-blocking transfer.
RayWeightUpdater / RayWeightSyncScheme
Used byRayCollector by default. Transfers weights through Ray’s object store, enabling zero-copy GPU-to-GPU transfer across nodes.
Multi-level hierarchies
Weight updates cascade automatically in hierarchical setups. ADistributedCollector with MultiSyncCollector sub-workers will push weights from the main process → distributed node → sub-workers using the appropriate scheme at each level.
Setup: torch.distributed and RPC
- torch.distributed
- RPC
- Ray