Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MilesONerd/neurenix/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Data loaders provide efficient batching, shuffling, and parallel loading of datasets for training and evaluation.

DataLoader Class

class DataLoader:
    def __init__(
        self,
        dataset: Dataset,
        batch_size: int = 1,
        shuffle: bool = False,
        num_workers: int = 0,
        pin_memory: bool = False,
        drop_last: bool = False,
        collate_fn: Optional[Callable] = None,
    )

Parameters

dataset
Dataset
required
Dataset to load data from.
batch_size
int
default:"1"
Number of samples per batch.
shuffle
bool
default:"False"
Whether to shuffle the data at the beginning of each epoch.
num_workers
int
default:"0"
Number of worker processes for parallel data loading. 0 means data will be loaded in the main process.
pin_memory
bool
default:"False"
If True, the data loader will copy tensors into CUDA pinned memory before returning them. Useful for GPU training.
drop_last
bool
default:"False"
Whether to drop the last incomplete batch if the dataset size is not divisible by the batch size.
collate_fn
Optional[Callable]
Function to merge a list of samples into a batch. If None, uses default collation.

Methods

iter

def __iter__(self)
Return an iterator over the dataset.

len

def __len__(self) -> int
Return the number of batches.
return
int
Number of batches in the data loader.

DistributedDataLoader

class DistributedDataLoader(DataLoader):
    def __init__(
        self,
        dataset: Dataset,
        batch_size: int = 1,
        shuffle: bool = False,
        num_workers: int = 0,
        rank: int = 0,
        world_size: int = 1,
        **kwargs
    )
Data loader for distributed training across multiple devices.

Additional Parameters

rank
int
default:"0"
Rank of the current process in distributed training.
world_size
int
default:"1"
Total number of processes in distributed training.

Utility Functions

default_collate

def default_collate(batch: List[Any]) -> Any
Default collation function that stacks samples into batches.
batch
List[Any]
required
List of samples to collate.
return
Any
Collated batch.

worker_init_fn

def worker_init_fn(worker_id: int) -> None
Initialization function for data loader workers.

Example Usage

import neurenix as nx
from neurenix.data import Dataset, DataLoader, load_dataset

# Load dataset
dataset = load_dataset("train_data.csv")

print(f"Dataset size: {len(dataset)}")

# Create data loader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    pin_memory=True,  # For GPU training
    drop_last=True
)

print(f"Number of batches: {len(train_loader)}")

# Iterate over batches
for epoch in range(10):
    for batch_idx, batch in enumerate(train_loader):
        # batch is a tensor of shape (batch_size, ...)
        inputs, labels = batch
        
        # Training step
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        if batch_idx % 100 == 0:
            print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")

# Custom collate function
def custom_collate(batch):
    # Custom batching logic
    data = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # Stack into tensors
    data_tensor = nx.Tensor.stack(data)
    labels_tensor = nx.Tensor.stack(labels)
    
    return data_tensor, labels_tensor

# Use custom collate function
custom_loader = DataLoader(
    dataset,
    batch_size=64,
    shuffle=True,
    collate_fn=custom_collate
)

# Distributed training
from neurenix.data import DistributedDataLoader

rank = 0  # Process rank
world_size = 4  # Total number of GPUs

dist_loader = DistributedDataLoader(
    dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2,
    rank=rank,
    world_size=world_size
)

for batch in dist_loader:
    # Each process gets a different subset
    outputs = model(batch)

# Variable length sequences
def pad_collate(batch):
    """Collate function for variable length sequences."""
    sequences = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # Find max length
    max_len = max(len(seq) for seq in sequences)
    
    # Pad sequences
    padded = []
    for seq in sequences:
        pad_len = max_len - len(seq)
        padded_seq = nx.Tensor.cat([
            seq,
            nx.Tensor.zeros((pad_len,) + seq.shape[1:])
        ])
        padded.append(padded_seq)
    
    return nx.Tensor.stack(padded), nx.Tensor(labels)

seq_loader = DataLoader(
    sequence_dataset,
    batch_size=16,
    collate_fn=pad_collate
)

# Validation loader (no shuffle)
val_loader = DataLoader(
    val_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=4
)

model.eval()
with nx.Tensor.no_grad():
    for batch in val_loader:
        predictions = model(batch)
        # Evaluate predictions

Performance Tips

num_workers: Use 2-8 workers for optimal performance. Too many workers can cause overhead.
pin_memory: Enable for GPU training to speed up host-to-device transfers.
prefetch: DataLoader automatically prefetches batches in the background for better throughput.
batch_size: Larger batch sizes improve GPU utilization but require more memory. Find the sweet spot for your hardware.

Common Patterns

Training Loop

for epoch in range(num_epochs):
    model.train()
    for batch in train_loader:
        # Training step
        loss = train_step(model, batch)
    
    model.eval()
    with nx.Tensor.no_grad():
        for batch in val_loader:
            # Validation step
            val_loss = validate_step(model, batch)

Multi-GPU Training

# Wrap model for data parallel training
model = nx.DataParallel(model, device_ids=[0, 1, 2, 3])

# Data loader automatically distributes batches
train_loader = DataLoader(
    dataset,
    batch_size=128,  # Total batch size across all GPUs
    shuffle=True,
    num_workers=8
)

for batch in train_loader:
    outputs = model(batch)  # Automatically parallelized

Build docs developers (and LLMs) love