Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MilesONerd/neurenix/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Neurenix provides comprehensive support for distributed training across multiple GPUs and compute nodes. The framework integrates with industry-standard distributed training backends including MPI, Horovod, and DeepSpeed.
MPI Backend
The Message Passing Interface (MPI) backend provides low-level distributed computing primitives for parallel training.
MPIManager
The MPIManager class provides an interface to MPI functionality for distributed training.
from neurenix.distributed.mpi import MPIManager, get_mpi_manager
# Initialize MPI manager
mpi = MPIManager(
backend="openmpi", # 'openmpi', 'mpich', 'intelmpi'
init_method="env", # 'env', 'tcp', 'file'
timeout=1800.0
)
# Access distributed info
print(f"Rank: {mpi.rank}/{mpi.world_size}")
print(f"Local rank: {mpi.local_rank}")
print(f"Is master: {mpi.is_master}")
Reference: neurenix/distributed/mpi.py:15
Collective Operations
Barrier Synchronization
# Synchronize all processes
mpi.barrier()
Reference: neurenix/distributed/mpi.py:135
Broadcast
# Broadcast data from rank 0 to all processes
data = {"learning_rate": 0.01, "batch_size": 32}
data = mpi.broadcast(data, src=0)
Reference: neurenix/distributed/mpi.py:148
All-Reduce
# Aggregate gradients across all processes
gradients = model.get_gradients()
aggregated = mpi.all_reduce(gradients, op="sum") # 'sum', 'prod', 'min', 'max', 'avg'
Reference: neurenix/distributed/mpi.py:168
All-Gather
# Gather data from all processes
local_metrics = {"loss": 0.5, "accuracy": 0.95}
all_metrics = mpi.all_gather(local_metrics)
Reference: neurenix/distributed/mpi.py:188
Scatter
# Distribute data from rank 0 to all processes
if mpi.is_master:
data_splits = [batch1, batch2, batch3, batch4]
else:
data_splits = None
local_data = mpi.scatter(data_splits, src=0)
Reference: neurenix/distributed/mpi.py:207
Context Manager
with MPIManager() as mpi:
# Training code here
model = create_model()
train(model)
# MPI automatically finalized
Reference: neurenix/distributed/mpi.py:227
Horovod Backend
Horovod provides a unified API for distributed training with automatic gradient aggregation.
HorovodManager
from neurenix.distributed.horovod import HorovodManager, get_horovod_manager
# Initialize Horovod
hvd = HorovodManager(
backend="auto", # 'mpi', 'gloo', 'nccl', 'auto'
init_method="env",
timeout=1800.0
)
print(f"Horovod rank: {hvd.rank}/{hvd.world_size}")
Reference: neurenix/distributed/horovod.py:15
Broadcasting Parameters
import neurenix as nx
# Broadcast model parameters from rank 0
model = create_model()
hvd.broadcast_parameters(model, root_rank=0)
Reference: neurenix/distributed/horovod.py:207
Distributed Optimizer
# Wrap optimizer for distributed training
optimizer = nx.optim.Adam(model.parameters(), lr=0.001)
dist_optimizer = hvd.distributed_optimizer(
optimizer,
compression="fp16", # Gradient compression
backward_passes_per_step=1, # Gradient accumulation
op="sum" # Reduction operation
)
Reference: neurenix/distributed/horovod.py:224
Complete Training Example
import neurenix as nx
from neurenix.distributed.horovod import get_horovod_manager
# Initialize Horovod
hvd = get_horovod_manager()
# Create model and optimizer
model = create_model().to(f"cuda:{hvd.local_rank}")
optimizer = nx.optim.SGD(model.parameters(), lr=0.01 * hvd.world_size)
# Broadcast initial parameters
hvd.broadcast_parameters(model, root_rank=0)
# Wrap optimizer
optimizer = hvd.distributed_optimizer(optimizer)
# Training loop
for epoch in range(num_epochs):
for batch in train_loader:
optimizer.zero_grad()
loss = model(batch)
loss.backward()
optimizer.step() # Gradients automatically aggregated
# Synchronize before evaluation
hvd.barrier()
if hvd.is_master:
evaluate(model, val_loader)
DeepSpeed Backend
DeepSpeed provides advanced optimization techniques including ZeRO optimization, pipeline parallelism, and mixed precision training.
DeepSpeedManager
from neurenix.distributed.deepspeed import DeepSpeedManager, get_deepspeed_manager
# Initialize DeepSpeed
ds_config = {
"train_batch_size": 32,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
}
}
ds = DeepSpeedManager(
backend="nccl",
config=ds_config,
timeout=1800.0
)
Reference: neurenix/distributed/deepspeed.py:15
Initialize Model with DeepSpeed
import neurenix as nx
model = create_model()
optimizer = nx.optim.Adam(model.parameters(), lr=0.001)
# Initialize with DeepSpeed
ds_engine = ds.initialize_model(
model=model,
optimizer=optimizer,
training_batch_size=32,
gradient_accumulation_steps=4,
fp16=True, # FP16 mixed precision
zero_optimization=True, # Enable ZeRO
zero_stage=2, # ZeRO stage (1, 2, or 3)
offload_optimizer=True, # Offload optimizer to CPU
offload_parameters=False # Keep parameters on GPU
)
Reference: neurenix/distributed/deepspeed.py:152
ZeRO Optimization Stages
ZeRO Stage 1: Optimizer state partitioning
- Partitions optimizer states across GPUs
- Reduces memory by ~4x for Adam
ZeRO Stage 2: Gradient partitioning
- Partitions gradients in addition to optimizer states
- Further reduces memory usage
ZeRO Stage 3: Parameter partitioning
- Partitions model parameters across GPUs
- Enables training very large models
# ZeRO Stage 3 example
ds_engine = ds.initialize_model(
model=model,
optimizer=optimizer,
zero_optimization=True,
zero_stage=3,
offload_optimizer=True,
offload_parameters=True # For extremely large models
)
Training with DeepSpeed
import neurenix as nx
from neurenix.distributed.deepspeed import get_deepspeed_manager
# Initialize DeepSpeed
ds = get_deepspeed_manager()
# Create and initialize model
model = create_model()
optimizer = nx.optim.Adam(model.parameters(), lr=0.001)
ds_engine = ds.initialize_model(
model=model,
optimizer=optimizer,
fp16=True,
zero_optimization=True,
zero_stage=2
)
# Training loop
for epoch in range(num_epochs):
for batch in train_loader:
loss = ds_engine(batch)
ds_engine.backward(loss)
ds_engine.step()
Data Parallel Training
For simple multi-GPU training on a single node:
import neurenix as nx
from neurenix.distributed import DataParallel
# Wrap model for data parallel training
model = create_model()
model = DataParallel(model, device_ids=[0, 1, 2, 3])
# Training proceeds normally
for batch in train_loader:
optimizer.zero_grad()
loss = model(batch)
loss.backward()
optimizer.step()
Reference: neurenix/distributed/__init__.py:9
Best Practices
Device Placement
# Automatic device selection based on local rank
if hvd.local_rank >= 0:
device = nx.device(f"cuda:{hvd.local_rank}")
else:
device = nx.device("cpu")
model = model.to(device)
Learning Rate Scaling
# Scale learning rate with number of workers
base_lr = 0.001
scaled_lr = base_lr * hvd.world_size
optimizer = nx.optim.SGD(model.parameters(), lr=scaled_lr)
Gradient Accumulation
# Accumulate gradients over multiple steps
accumulation_steps = 4
for i, batch in enumerate(train_loader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
Checkpointing
# Save checkpoint on master process only
if hvd.is_master:
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}
nx.save(checkpoint, 'checkpoint.pth')
# Synchronize before continuing
hvd.barrier()
Environment Variables
Common environment variables for distributed training:
# MPI
export OMPI_COMM_WORLD_SIZE=4
export OMPI_COMM_WORLD_RANK=0
export OMPI_COMM_WORLD_LOCAL_RANK=0
# Horovod
export HOROVOD_TIMELINE=timeline.json
export HOROVOD_AUTOTUNE=1
# DeepSpeed
export DEEPSPEED_CONFIG=ds_config.json
# NCCL (for GPU communication)
export NCCL_DEBUG=INFO
export NCCL_IB_DISABLE=1 # Disable InfiniBand if not available
Launching Distributed Training
MPI Launch
mpirun -np 4 -H node1:2,node2:2 python train.py
Horovod Launch
horovodrun -np 4 -H localhost:4 python train.py
DeepSpeed Launch
deepspeed --num_gpus=4 train.py --deepspeed_config=ds_config.json
- Use NCCL backend for GPU communication (fastest)
- Enable tensor fusion to reduce communication overhead
- Use gradient compression (FP16) to reduce bandwidth
- Batch data loading to maximize GPU utilization
- Profile communication to identify bottlenecks
- Use InfiniBand for multi-node training when available