Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt

Use this file to discover all available pages before exploring further.

Walrus is a distributed message streaming platform built on a high-performance log storage engine. It provides fault-tolerant streaming with automatic leadership rotation, segment-based partitioning, and Raft consensus for metadata coordination.

System Overview

Walrus Architecture Producers and consumers connect to any node (or via load balancer). The cluster automatically routes requests to the appropriate leader and manages segment rollovers for load distribution.

Key Features

  • Automatic load balancing via segment-based leadership rotation
  • Fault tolerance through Raft consensus (3+ nodes)
  • Simple client protocol (connect to any node, auto-forwarding)
  • Sealed segments for historical reads from any replica
  • High-performance storage with io_uring on Linux

Node Internals

Walrus Node Architecture Each node contains four key components that work together to provide distributed streaming:

Node Controller

The central coordination hub that glues together all components. Responsibilities:
  • Routes client requests to appropriate segment leaders
  • Manages write leases (synced from cluster metadata every 100ms)
  • Tracks logical offsets for rollover detection
  • Forwards operations to remote leaders when needed
Key Operations:
  • ensure_topic(topic) - Create topic if it doesn’t exist
  • append_for_topic(topic, data) - Write routing
  • read_one_for_topic(topic, cursor) - Read routing
  • update_leases() - Sync with metadata state

Raft Engine (Octopii)

Maintains Raft consensus for metadata changes only (not data!). Key Points:
  • Handles leader election and log replication
  • One node is Raft leader, others are followers
  • Syncs metadata across all nodes via AppendEntries RPCs
  • Metadata only - actual data writes bypass Raft for performance

Cluster Metadata (Raft State Machine)

Replicated state machine that stores cluster topology:
ClusterState {
  topics: HashMap<TopicName, TopicState>
  nodes: HashMap<NodeId, RaftAddress>
}

TopicState {
  current_segment: u64
  leader_node: NodeId
  sealed_segments: HashMap<segment_id, entry_count>
  segment_leaders: HashMap<segment_id, NodeId>
}
Metadata Commands:
  • CreateTopic { name, initial_leader }
  • RolloverTopic { name, new_leader, sealed_count }
  • UpsertNode { node_id, addr }

Storage Engine (Bucket)

Wraps the Walrus storage engine with lease-based write fencing:
  • Only accepts writes if node holds lease for that segment
  • Stores actual data in WAL files on disk (wal_files/data_plane/)
  • Serves reads from any segment (sealed or active)
  • Uses per-key write locks to prevent concurrent writes

Component Interactions

Write Path

Client: PUT logs "hello world"

    └─► Client Listener receives request

        ├─► Controller::ensure_topic("logs")
        │   └─► Create topic via Raft if missing

        └─► Controller::append_for_topic("logs", data)

            ├─► Metadata::get_topic_state("logs")
            │   └─► Returns: {current_segment: 1, leader_node: 2}

            ├─► Am I leader? (self.node_id == 2?)
            │   NO: Forward to node 2 via RPC
            │   YES: Proceed locally

            └─► Storage::append_by_key("logs:1", data)
                ├─► ensure_lease("logs:1") ✓
                ├─► Acquire per-key mutex
                └─► Walrus::batch_append_for_topic (durable write)

Read Path with Cursor Advancement

Client: GET logs

    └─► Controller::read_one_for_topic("logs")

        ├─► Fetch shared cursor: {segment: 1, delivered: 5}

        ├─► Metadata::get_topic_state("logs")
        │   └─► sealed_segments: {1 → 1,000,000}
        │       current_segment: 2

        ├─► Is segment 1 sealed? YES
        │   Is 5 < 1,000,000? YES, more data available

        ├─► Determine leader for segment 1 → node 2

        ├─► Forward to node 2 (if remote)
        │   └─► Storage::read_one("logs:1")
        │       └─► Walrus::read_next("logs:1", advance=true)

        ├─► Got data? YES
        │   ├─► cursor.delivered += 1 (now 6)
        │   └─► Return Some(entry)

        └─► If segment exhausted (delivered >= sealed_count):
            ├─► cursor.segment += 1 (advance to segment 2)
            ├─► cursor.delivered = 0
            └─► LOOP (try next segment)

Lease Synchronization

Leases prevent split-brain writes by ensuring only the designated leader can write to each segment. Lease Sync Flow (every 100ms):
Controller::update_leases()

    ├─► Metadata::owned_topics(self.node_id)
    │   └─► Query local Raft state replica
    │       Example: [("logs", 1), ("metrics", 3)]

    ├─► Convert to wal_keys: ["logs:1", "metrics:3"]

    └─► Storage::update_leases(expected_set)
        ├─► Compare current vs expected
        ├─► Add missing leases
        └─► Remove stale leases
After Rollover:
  • Old leader: Next update_leases() removes lease → writes fail
  • New leader: Next update_leases() grants lease → accepts writes
  • Window: ≤100ms until lease sync completes

Metadata Replication via Raft

All cluster state changes flow through Raft consensus:
Controller::ensure_topic("metrics")

    └─► propose_metadata(CreateTopic{name: "metrics", leader: 3})

        ├─► Serialize command: bincode(CreateTopic{...})

        └─► Raft::propose(bytes)
            ├─► Append to local Raft log
            ├─► Send AppendEntries RPC to followers
            │   ├─► Node 2: append, ACK
            │   └─► Node 3: append, ACK
            ├─► Wait for quorum (2 of 3 nodes)
            ├─► Commit entry (advance commit_index)
            └─► Apply to state machine on ALL nodes
                └─► Metadata::apply(command_bytes)
                    ├─► Deserialize CreateTopic
                    └─► state.topics.insert("metrics", ...)
All nodes now have consistent view of topics and segment leaders.

Automatic Segment Rollover

The monitor loop orchestrates leadership rotation when segments reach capacity: Monitor Flow (every 10s):
Monitor::tick()

    └─► For each owned segment:
        ├─► Check: entry_count >= 1,000,000?
        │   YES → TRIGGER ROLLOVER

        ├─► Select next leader (round-robin from Raft voters)
        │   Example: voters=[1,2,3], current=node 2
        │            next = (1+1) % 3 = 2 → voters[2] = node 3

        └─► propose_metadata(RolloverTopic {
              name: "logs",
              new_leader: 3,
              sealed_segment_entry_count: 1,000,050
            })
Rollover Effect:
  1. Metadata State Updated:
    • sealed_segments[1] = 1,000,050 (preserve old segment count)
    • segment_leaders[1] = 2 (preserve old leader for reads)
    • current_segment = 2 (new active segment)
    • leader_node = 3 (new writes go to node 3)
    • segment_leaders[2] = 3
  2. Lease Transfer (within 100ms):
    • Node 2 (old leader):
      • Loses lease for logs:1
      • Future writes → NotLeaderError
      • But sealed data still readable!
    • Node 3 (new leader):
      • Gains lease for logs:2
      • Starts accepting writes to new segment
  3. Client Routing:
    • Writes: PUT logs "data" → Routes to node 3, appends to logs:2
    • Reads:
      • Cursor on segment 1? → Routes to node 2
      • Cursor on segment 2? → Routes to node 3

Storage Engine Architecture

The underlying Walrus storage engine is optimized for high-throughput streaming:

Layered Design

  • API Layer: append_for_topic, batch_append_for_topic, batch_read_for_topic
  • Core Engine: BlockAllocator, Writer, Reader, ReadOffsetIndex
  • Storage Backends: Mmap backend or FD backend with io_uring (Linux)
  • Durable Media: WAL files in wal_files/<namespace>/

Storage Layout

  • Files: Pre-allocated to 1GB (100 × 10MB blocks), named with millisecond timestamps
  • Blocks: Fixed 10MB logical segments tracked by BlockAllocator
  • Metadata prefix: Every entry stores 64-byte prefix (topic, size, FNV-1a checksum)
  • Index files: <topic>_index.db stores read cursors for recovery

Performance Characteristics

  • Block size: 10MB per block
  • File size: 1GB (100 blocks)
  • Batch limits: Up to 2,000 entries or ~10GB payload per batch
  • Default fsync interval: 200ms
  • io_uring: Automatic on Linux with FD backend for batch operations

Configuration

Environment Variables

VariableDefaultDescription
WALRUS_MAX_SEGMENT_ENTRIES1000000Entries before rollover
WALRUS_MONITOR_CHECK_MS10000Monitor loop interval (ms)
WALRUS_DISABLE_IO_URING-Use mmap instead of io_uring
WALRUS_DATA_DIR./wal_filesStorage location
RUST_LOGinfoLog level

CLI Flags

FlagDefaultDescription
--node-id(required)Unique node identifier
--data-dir./dataRoot directory for storage
--raft-port6000Raft/Internal RPC port
--client-port8080Client TCP port
--join-Address of existing node to join

Performance

  • Write throughput: Single writer per segment (lease-based)
  • Read throughput: Scales with replicas (sealed segments)
  • Latency: ~1-2 RTT for forwarded ops + storage latency
  • Consensus overhead: Metadata only (not data path)
  • Segment rollover: ~1M entries default (~100MB depending on payload size)
The distributed architecture separates the data plane (high-throughput writes to Walrus) from the control plane (Raft consensus for metadata). This design ensures that write performance is not bottlenecked by consensus overhead.

Topics and Segments

Learn how topics are partitioned into segments and how rollover works

Raft Consensus

Deep dive into how Raft coordinates metadata across nodes

Consistency Models

Understand read consistency and fsync scheduling options

Build docs developers (and LLMs) love