Documentation Index
Fetch the complete documentation index at: https://mintlify.com/Wenyueh/MinivLLM/llms.txt
Use this file to discover all available pages before exploring further.
MiniVLLM uses tensor parallelism to distribute a single model across multiple GPUs. Each GPU holds a shard of every weight matrix and cooperates with the other GPUs using NCCL collectives.
How tensor parallelism works
A standard linear layer computes Y = X W. With tensor parallelism the weight matrix W is split across GPUs and the computation is distributed:
W is split along dim=0 (output features). Each GPU computes a slice of the output independently — no communication is needed during the forward pass.GPU 0: Y₀ = X W₀ # first half of output features
GPU 1: Y₁ = X W₁ # second half of output features
Used for: Q, K, V projections and MLP gate/up projections.W is split along dim=1 (input features). Each GPU computes a partial dot product that must be summed across all GPUs with dist.all_reduce.GPU 0: partial₀ = X₀ W₀
GPU 1: partial₁ = X₁ W₁
# After all_reduce:
Y = partial₀ + partial₁ # full output on every GPU
Used for: attention output projection (o_proj) and MLP down projection.
The two types are always paired: a column-parallel layer shards the output, which becomes the sharded input consumed by the following row-parallel layer. The only cross-GPU communication is the dist.all_reduce at the end of each row-parallel layer.
Linear layer variants
ColumnParallelLinear
Each GPU stores output_size // tp_size output rows. The weight loader extracts the shard corresponding to the current rank.
class ColumnParallelLinear(LinearBase):
def __init__(self, input_size: int, output_size: int, bias: bool = True):
tp_size = dist.get_world_size()
# Weight shape per GPU: (output_size // tp_size, input_size)
super().__init__(input_size, output_size // tp_size, bias, tp_dim=0)
def weight_loader(self, param, loaded_weights):
shard_size = loaded_weights.size(0) // self.tp_size
start = self.tp_rank * shard_size
param.data.copy_(loaded_weights.narrow(0, start, shard_size))
def forward(self, x):
return nn.functional.linear(x, self.weight, self.bias)
# No communication — each GPU produces its slice of the output
RowParallelLinear
Each GPU stores input_size // tp_size input columns. A dist.all_reduce sums partial results after the matrix multiply.
class RowParallelLinear(LinearBase):
def __init__(self, input_size: int, output_size: int, bias: bool = True):
tp_size = dist.get_world_size()
# Weight shape per GPU: (output_size, input_size // tp_size)
super().__init__(input_size // tp_size, output_size, bias, tp_dim=1)
def weight_loader(self, param, loaded_weights):
shard_size = loaded_weights.size(1) // self.tp_size
start = self.tp_rank * shard_size
param.data.copy_(loaded_weights.narrow(1, start, shard_size))
def forward(self, x):
result = nn.functional.linear(x, self.weight, self.bias)
if self.tp_size > 1:
dist.all_reduce(result, op=dist.ReduceOp.SUM)
return result # Full output, replicated on all GPUs
QKVColumnParallelLinear
For attention, the column split must respect head boundaries. Each GPU handles complete attention heads rather than fractional ones:
class QKVColumnParallelLinear(ColumnParallelLinear):
def __init__(
self,
input_size: int,
head_size: int,
num_heads: int,
num_kv_heads: int | None = None,
bias: bool = False,
):
self.num_heads = num_heads // tp_size # per-GPU Q heads
self.num_kv_heads = num_kv_heads // tp_size # per-GPU KV heads
total_output = head_size * (num_heads + 2 * num_kv_heads)
super().__init__(input_size, total_output, bias=bias)
The weight loader accepts load_weight_id ('q', 'k', or 'v') to identify which sub-matrix is being loaded and computes the correct offset within the merged QKV tensor.
MergedColumnParallelLinear
Holds the gate and up MLP projections in a single weight tensor. The weight loader is called once per sub-matrix with a loaded_weight_id integer (0 for gate, 1 for up):
self.gate_up = MergedColumnParallelLinear(
input_size=hidden_size,
output_sizes=[intermediate_size, intermediate_size],
)
Data flow through an attention block
Input x: (batch, seq, hidden_size) — replicated on all GPUs
│
▼
QKVColumnParallelLinear — column parallel: no communication
│ each GPU computes its Q/K/V head slice
▼
q_norm, k_norm, RotaryEmbedding — local ops: no communication
│
▼
Attention (flash / paged) — local ops: each GPU attends its own heads
│ o: (batch, num_heads_per_gpu * head_dim)
▼
RowParallelLinear (o_proj) — row parallel: dist.all_reduce sums partials
│
▼
Output: (batch, hidden_size) — replicated on all GPUs
Enabling multi-GPU inference
Change world_size in the config dict passed to LLMEngine:
config = {
"model_name_or_path": "/path/to/Qwen3-0.6B",
"world_size": 2, # set to the number of GPUs
"block_size": 256,
# ... other model parameters
}
engine = LLMEngine(config)
All of num_heads, num_kv_heads, and vocab_size must be divisible by world_size. The model layers assert this at construction time.
Process group initialization
Every process (rank 0 and workers) calls dist.init_process_group inside ModelRunner.__init__:
class ModelRunner:
def __init__(self, config, rank, event):
dist.init_process_group(
'nccl',
'tcp://localhost:12345',
world_size=config['world_size'],
rank=rank,
)
torch.cuda.set_device(rank)
dist.init_process_group is a collective barrier — rank 0 blocks until every worker rank has called it. This guarantees that all NCCL channels are established before any collective operation (e.g. all_reduce, all_gather) is executed.
Worker process lifecycle
[LLMEngine.__init__]
│
├─ spawn worker processes (rank 1 … N)
│ └─ worker_process(config, rank, event)
│ └─ ModelRunner(config, rank, event) ← blocks at dist.init_process_group
│ └─ model_runner.loop() ← enters event loop
│
└─ ModelRunner(config, rank=0, events) ← blocks at dist.init_process_group
│ (all ranks joined — barrier clears)
├─ warmup_model()
├─ allocate_kv_cache() ← dist.all_reduce(MIN) for block count
├─ capture_cudagraph()
└─ dist.barrier() → create SharedMemory
After initialization, the worker loop runs:
def loop(self): # workers only
while True:
method_name, args = self.read_shm() # block on event
self.call(method_name, *args) # execute (e.g. "run")
if method_name == 'exit':
self.exit()
break
Shared memory communication
Rank 0 (the scheduler process) communicates with worker ranks through a POSIX shared memory segment named myvllm. This avoids serializing data through the OS socket used by NCCL.
Rank 0 writes a call
def write_shm(self, method_name: str, args: tuple):
data = pickle.dumps((method_name, *args))
n = len(data)
self.shm.buf[:4] = n.to_bytes(4, 'little') # 4-byte length prefix
self.shm.buf[4:n+4] = data
for event in self.events: # one event per worker
event.set() # signal each worker
Worker reads the call
def read_shm(self):
self.event.wait() # block until rank 0 signals
n = int.from_bytes(self.shm.buf[:4], 'little')
method_name, *args = pickle.loads(self.shm.buf[4:n+4])
self.event.clear() # reset for next message
return method_name, args
Worker executes the method
# Inside loop()
self.call(method_name, *args)
# call() → getattr(self, method_name)(*args)
# e.g. self.run(seqs, is_prefill)
Both rank 0 and workers call self.run(...). Because their model weights are sharded identically, all collective operations (all_reduce, etc.) execute in lock-step across ranks.
The shared memory segment is created by rank 0 after a dist.barrier() to ensure all worker processes have completed their initialization before the segment is written.
Why rank 0 handles scheduling and sampling
Scheduling. The scheduler maintains the waiting and running queues and decides which sequences run each step. This is a CPU-side decision that does not need to be replicated across GPUs.
Sampling. After the model forward pass, logits on rank 0 are either computed directly (single GPU) or gathered via dist.gather from all ranks. Only rank 0 calls SamplerLayer.forward to convert logits to token IDs:
def run(self, seqs, is_prefill):
...
logits = self.run_model(input_ids, is_prefill)
token_ids = None
if self.rank == 0: # only rank 0 samples
token_ids = self.sampler(logits, self.prepare_sample(seqs))
reset_context()
return token_ids # None on worker ranks
Worker ranks return None from run. Only rank 0 returns the sampled token IDs to the engine, which then passes them to scheduler.postprocess.