Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Wenyueh/MinivLLM/llms.txt

Use this file to discover all available pages before exploring further.

MiniVLLM uses tensor parallelism to distribute a single model across multiple GPUs. Each GPU holds a shard of every weight matrix and cooperates with the other GPUs using NCCL collectives.

How tensor parallelism works

A standard linear layer computes Y = X W. With tensor parallelism the weight matrix W is split across GPUs and the computation is distributed:
W is split along dim=0 (output features). Each GPU computes a slice of the output independently — no communication is needed during the forward pass.
GPU 0: Y₀ = X W₀    # first half of output features
GPU 1: Y₁ = X W₁    # second half of output features
Used for: Q, K, V projections and MLP gate/up projections.
The two types are always paired: a column-parallel layer shards the output, which becomes the sharded input consumed by the following row-parallel layer. The only cross-GPU communication is the dist.all_reduce at the end of each row-parallel layer.

Linear layer variants

ColumnParallelLinear

Each GPU stores output_size // tp_size output rows. The weight loader extracts the shard corresponding to the current rank.
class ColumnParallelLinear(LinearBase):
    def __init__(self, input_size: int, output_size: int, bias: bool = True):
        tp_size = dist.get_world_size()
        # Weight shape per GPU: (output_size // tp_size, input_size)
        super().__init__(input_size, output_size // tp_size, bias, tp_dim=0)

    def weight_loader(self, param, loaded_weights):
        shard_size = loaded_weights.size(0) // self.tp_size
        start = self.tp_rank * shard_size
        param.data.copy_(loaded_weights.narrow(0, start, shard_size))

    def forward(self, x):
        return nn.functional.linear(x, self.weight, self.bias)
        # No communication — each GPU produces its slice of the output

RowParallelLinear

Each GPU stores input_size // tp_size input columns. A dist.all_reduce sums partial results after the matrix multiply.
class RowParallelLinear(LinearBase):
    def __init__(self, input_size: int, output_size: int, bias: bool = True):
        tp_size = dist.get_world_size()
        # Weight shape per GPU: (output_size, input_size // tp_size)
        super().__init__(input_size // tp_size, output_size, bias, tp_dim=1)

    def weight_loader(self, param, loaded_weights):
        shard_size = loaded_weights.size(1) // self.tp_size
        start = self.tp_rank * shard_size
        param.data.copy_(loaded_weights.narrow(1, start, shard_size))

    def forward(self, x):
        result = nn.functional.linear(x, self.weight, self.bias)
        if self.tp_size > 1:
            dist.all_reduce(result, op=dist.ReduceOp.SUM)
        return result  # Full output, replicated on all GPUs

QKVColumnParallelLinear

For attention, the column split must respect head boundaries. Each GPU handles complete attention heads rather than fractional ones:
class QKVColumnParallelLinear(ColumnParallelLinear):
    def __init__(
        self,
        input_size: int,
        head_size: int,
        num_heads: int,
        num_kv_heads: int | None = None,
        bias: bool = False,
    ):
        self.num_heads     = num_heads     // tp_size  # per-GPU Q heads
        self.num_kv_heads  = num_kv_heads  // tp_size  # per-GPU KV heads
        total_output = head_size * (num_heads + 2 * num_kv_heads)
        super().__init__(input_size, total_output, bias=bias)
The weight loader accepts load_weight_id ('q', 'k', or 'v') to identify which sub-matrix is being loaded and computes the correct offset within the merged QKV tensor.

MergedColumnParallelLinear

Holds the gate and up MLP projections in a single weight tensor. The weight loader is called once per sub-matrix with a loaded_weight_id integer (0 for gate, 1 for up):
self.gate_up = MergedColumnParallelLinear(
    input_size=hidden_size,
    output_sizes=[intermediate_size, intermediate_size],
)

Data flow through an attention block

Input x: (batch, seq, hidden_size)  — replicated on all GPUs


QKVColumnParallelLinear             — column parallel: no communication
        │  each GPU computes its Q/K/V head slice

q_norm, k_norm, RotaryEmbedding     — local ops: no communication


Attention (flash / paged)           — local ops: each GPU attends its own heads
        │  o: (batch, num_heads_per_gpu * head_dim)

RowParallelLinear (o_proj)          — row parallel: dist.all_reduce sums partials


Output: (batch, hidden_size)        — replicated on all GPUs

Enabling multi-GPU inference

Change world_size in the config dict passed to LLMEngine:
config = {
    "model_name_or_path": "/path/to/Qwen3-0.6B",
    "world_size": 2,   # set to the number of GPUs
    "block_size": 256,
    # ... other model parameters
}
engine = LLMEngine(config)
All of num_heads, num_kv_heads, and vocab_size must be divisible by world_size. The model layers assert this at construction time.

Process group initialization

Every process (rank 0 and workers) calls dist.init_process_group inside ModelRunner.__init__:
class ModelRunner:
    def __init__(self, config, rank, event):
        dist.init_process_group(
            'nccl',
            'tcp://localhost:12345',
            world_size=config['world_size'],
            rank=rank,
        )
        torch.cuda.set_device(rank)
dist.init_process_group is a collective barrier — rank 0 blocks until every worker rank has called it. This guarantees that all NCCL channels are established before any collective operation (e.g. all_reduce, all_gather) is executed.

Worker process lifecycle

[LLMEngine.__init__]

  ├─ spawn worker processes (rank 1 … N)
  │    └─ worker_process(config, rank, event)
  │         └─ ModelRunner(config, rank, event)  ← blocks at dist.init_process_group
  │              └─ model_runner.loop()           ← enters event loop

  └─ ModelRunner(config, rank=0, events)         ← blocks at dist.init_process_group
       │  (all ranks joined — barrier clears)
       ├─ warmup_model()
       ├─ allocate_kv_cache()                    ← dist.all_reduce(MIN) for block count
       ├─ capture_cudagraph()
       └─ dist.barrier()  →  create SharedMemory
After initialization, the worker loop runs:
def loop(self):   # workers only
    while True:
        method_name, args = self.read_shm()   # block on event
        self.call(method_name, *args)          # execute (e.g. "run")
        if method_name == 'exit':
            self.exit()
            break

Shared memory communication

Rank 0 (the scheduler process) communicates with worker ranks through a POSIX shared memory segment named myvllm. This avoids serializing data through the OS socket used by NCCL.
1

Rank 0 writes a call

def write_shm(self, method_name: str, args: tuple):
    data = pickle.dumps((method_name, *args))
    n = len(data)
    self.shm.buf[:4] = n.to_bytes(4, 'little')   # 4-byte length prefix
    self.shm.buf[4:n+4] = data
    for event in self.events:   # one event per worker
        event.set()             # signal each worker
2

Worker reads the call

def read_shm(self):
    self.event.wait()           # block until rank 0 signals
    n = int.from_bytes(self.shm.buf[:4], 'little')
    method_name, *args = pickle.loads(self.shm.buf[4:n+4])
    self.event.clear()          # reset for next message
    return method_name, args
3

Worker executes the method

# Inside loop()
self.call(method_name, *args)
# call() → getattr(self, method_name)(*args)
# e.g. self.run(seqs, is_prefill)
Both rank 0 and workers call self.run(...). Because their model weights are sharded identically, all collective operations (all_reduce, etc.) execute in lock-step across ranks.
The shared memory segment is created by rank 0 after a dist.barrier() to ensure all worker processes have completed their initialization before the segment is written.

Why rank 0 handles scheduling and sampling

Scheduling. The scheduler maintains the waiting and running queues and decides which sequences run each step. This is a CPU-side decision that does not need to be replicated across GPUs. Sampling. After the model forward pass, logits on rank 0 are either computed directly (single GPU) or gathered via dist.gather from all ranks. Only rank 0 calls SamplerLayer.forward to convert logits to token IDs:
def run(self, seqs, is_prefill):
    ...
    logits = self.run_model(input_ids, is_prefill)
    token_ids = None
    if self.rank == 0:   # only rank 0 samples
        token_ids = self.sampler(logits, self.prepare_sample(seqs))
    reset_context()
    return token_ids     # None on worker ranks
Worker ranks return None from run. Only rank 0 returns the sampled token IDs to the engine, which then passes them to scheduler.postprocess.

Build docs developers (and LLMs) love