Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MilesONerd/neurenix/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Neurenix provides comprehensive support for distributed training across multiple GPUs and compute nodes. The framework integrates with industry-standard distributed training backends including MPI, Horovod, and DeepSpeed.

MPI Backend

The Message Passing Interface (MPI) backend provides low-level distributed computing primitives for parallel training.

MPIManager

The MPIManager class provides an interface to MPI functionality for distributed training.
from neurenix.distributed.mpi import MPIManager, get_mpi_manager

# Initialize MPI manager
mpi = MPIManager(
    backend="openmpi",  # 'openmpi', 'mpich', 'intelmpi'
    init_method="env",   # 'env', 'tcp', 'file'
    timeout=1800.0
)

# Access distributed info
print(f"Rank: {mpi.rank}/{mpi.world_size}")
print(f"Local rank: {mpi.local_rank}")
print(f"Is master: {mpi.is_master}")
Reference: neurenix/distributed/mpi.py:15

Collective Operations

Barrier Synchronization

# Synchronize all processes
mpi.barrier()
Reference: neurenix/distributed/mpi.py:135

Broadcast

# Broadcast data from rank 0 to all processes
data = {"learning_rate": 0.01, "batch_size": 32}
data = mpi.broadcast(data, src=0)
Reference: neurenix/distributed/mpi.py:148

All-Reduce

# Aggregate gradients across all processes
gradients = model.get_gradients()
aggregated = mpi.all_reduce(gradients, op="sum")  # 'sum', 'prod', 'min', 'max', 'avg'
Reference: neurenix/distributed/mpi.py:168

All-Gather

# Gather data from all processes
local_metrics = {"loss": 0.5, "accuracy": 0.95}
all_metrics = mpi.all_gather(local_metrics)
Reference: neurenix/distributed/mpi.py:188

Scatter

# Distribute data from rank 0 to all processes
if mpi.is_master:
    data_splits = [batch1, batch2, batch3, batch4]
else:
    data_splits = None

local_data = mpi.scatter(data_splits, src=0)
Reference: neurenix/distributed/mpi.py:207

Context Manager

with MPIManager() as mpi:
    # Training code here
    model = create_model()
    train(model)
# MPI automatically finalized
Reference: neurenix/distributed/mpi.py:227

Horovod Backend

Horovod provides a unified API for distributed training with automatic gradient aggregation.

HorovodManager

from neurenix.distributed.horovod import HorovodManager, get_horovod_manager

# Initialize Horovod
hvd = HorovodManager(
    backend="auto",      # 'mpi', 'gloo', 'nccl', 'auto'
    init_method="env",
    timeout=1800.0
)

print(f"Horovod rank: {hvd.rank}/{hvd.world_size}")
Reference: neurenix/distributed/horovod.py:15

Broadcasting Parameters

import neurenix as nx

# Broadcast model parameters from rank 0
model = create_model()
hvd.broadcast_parameters(model, root_rank=0)
Reference: neurenix/distributed/horovod.py:207

Distributed Optimizer

# Wrap optimizer for distributed training
optimizer = nx.optim.Adam(model.parameters(), lr=0.001)

dist_optimizer = hvd.distributed_optimizer(
    optimizer,
    compression="fp16",              # Gradient compression
    backward_passes_per_step=1,      # Gradient accumulation
    op="sum"                          # Reduction operation
)
Reference: neurenix/distributed/horovod.py:224

Complete Training Example

import neurenix as nx
from neurenix.distributed.horovod import get_horovod_manager

# Initialize Horovod
hvd = get_horovod_manager()

# Create model and optimizer
model = create_model().to(f"cuda:{hvd.local_rank}")
optimizer = nx.optim.SGD(model.parameters(), lr=0.01 * hvd.world_size)

# Broadcast initial parameters
hvd.broadcast_parameters(model, root_rank=0)

# Wrap optimizer
optimizer = hvd.distributed_optimizer(optimizer)

# Training loop
for epoch in range(num_epochs):
    for batch in train_loader:
        optimizer.zero_grad()
        loss = model(batch)
        loss.backward()
        optimizer.step()  # Gradients automatically aggregated
    
    # Synchronize before evaluation
    hvd.barrier()
    
    if hvd.is_master:
        evaluate(model, val_loader)

DeepSpeed Backend

DeepSpeed provides advanced optimization techniques including ZeRO optimization, pipeline parallelism, and mixed precision training.

DeepSpeedManager

from neurenix.distributed.deepspeed import DeepSpeedManager, get_deepspeed_manager

# Initialize DeepSpeed
ds_config = {
    "train_batch_size": 32,
    "gradient_accumulation_steps": 1,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    }
}

ds = DeepSpeedManager(
    backend="nccl",
    config=ds_config,
    timeout=1800.0
)
Reference: neurenix/distributed/deepspeed.py:15

Initialize Model with DeepSpeed

import neurenix as nx

model = create_model()
optimizer = nx.optim.Adam(model.parameters(), lr=0.001)

# Initialize with DeepSpeed
ds_engine = ds.initialize_model(
    model=model,
    optimizer=optimizer,
    training_batch_size=32,
    gradient_accumulation_steps=4,
    fp16=True,                    # FP16 mixed precision
    zero_optimization=True,       # Enable ZeRO
    zero_stage=2,                 # ZeRO stage (1, 2, or 3)
    offload_optimizer=True,       # Offload optimizer to CPU
    offload_parameters=False      # Keep parameters on GPU
)
Reference: neurenix/distributed/deepspeed.py:152

ZeRO Optimization Stages

ZeRO Stage 1: Optimizer state partitioning
  • Partitions optimizer states across GPUs
  • Reduces memory by ~4x for Adam
ZeRO Stage 2: Gradient partitioning
  • Partitions gradients in addition to optimizer states
  • Further reduces memory usage
ZeRO Stage 3: Parameter partitioning
  • Partitions model parameters across GPUs
  • Enables training very large models
# ZeRO Stage 3 example
ds_engine = ds.initialize_model(
    model=model,
    optimizer=optimizer,
    zero_optimization=True,
    zero_stage=3,
    offload_optimizer=True,
    offload_parameters=True  # For extremely large models
)

Training with DeepSpeed

import neurenix as nx
from neurenix.distributed.deepspeed import get_deepspeed_manager

# Initialize DeepSpeed
ds = get_deepspeed_manager()

# Create and initialize model
model = create_model()
optimizer = nx.optim.Adam(model.parameters(), lr=0.001)

ds_engine = ds.initialize_model(
    model=model,
    optimizer=optimizer,
    fp16=True,
    zero_optimization=True,
    zero_stage=2
)

# Training loop
for epoch in range(num_epochs):
    for batch in train_loader:
        loss = ds_engine(batch)
        ds_engine.backward(loss)
        ds_engine.step()

Data Parallel Training

For simple multi-GPU training on a single node:
import neurenix as nx
from neurenix.distributed import DataParallel

# Wrap model for data parallel training
model = create_model()
model = DataParallel(model, device_ids=[0, 1, 2, 3])

# Training proceeds normally
for batch in train_loader:
    optimizer.zero_grad()
    loss = model(batch)
    loss.backward()
    optimizer.step()
Reference: neurenix/distributed/__init__.py:9

Best Practices

Device Placement

# Automatic device selection based on local rank
if hvd.local_rank >= 0:
    device = nx.device(f"cuda:{hvd.local_rank}")
else:
    device = nx.device("cpu")

model = model.to(device)

Learning Rate Scaling

# Scale learning rate with number of workers
base_lr = 0.001
scaled_lr = base_lr * hvd.world_size

optimizer = nx.optim.SGD(model.parameters(), lr=scaled_lr)

Gradient Accumulation

# Accumulate gradients over multiple steps
accumulation_steps = 4

for i, batch in enumerate(train_loader):
    loss = model(batch) / accumulation_steps
    loss.backward()
    
    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

Checkpointing

# Save checkpoint on master process only
if hvd.is_master:
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }
    nx.save(checkpoint, 'checkpoint.pth')

# Synchronize before continuing
hvd.barrier()

Environment Variables

Common environment variables for distributed training:
# MPI
export OMPI_COMM_WORLD_SIZE=4
export OMPI_COMM_WORLD_RANK=0
export OMPI_COMM_WORLD_LOCAL_RANK=0

# Horovod
export HOROVOD_TIMELINE=timeline.json
export HOROVOD_AUTOTUNE=1

# DeepSpeed
export DEEPSPEED_CONFIG=ds_config.json

# NCCL (for GPU communication)
export NCCL_DEBUG=INFO
export NCCL_IB_DISABLE=1  # Disable InfiniBand if not available

Launching Distributed Training

MPI Launch

mpirun -np 4 -H node1:2,node2:2 python train.py

Horovod Launch

horovodrun -np 4 -H localhost:4 python train.py

DeepSpeed Launch

deepspeed --num_gpus=4 train.py --deepspeed_config=ds_config.json

Performance Tips

  1. Use NCCL backend for GPU communication (fastest)
  2. Enable tensor fusion to reduce communication overhead
  3. Use gradient compression (FP16) to reduce bandwidth
  4. Batch data loading to maximize GPU utilization
  5. Profile communication to identify bottlenecks
  6. Use InfiniBand for multi-node training when available

Build docs developers (and LLMs) love