Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Metadata component maintains the cluster’s shared state through Raft consensus. It tracks all topics, their segments, leadership assignments, and node addresses. This state is replicated across all nodes and serves as the source of truth for routing decisions. Location: distributed-walrus/src/metadata.rs

Core Data Structures

ClusterState

The top-level replicated state container. Location: metadata.rs:11
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClusterState {
    pub topics: HashMap<TopicName, TopicState>,
    pub nodes: HashMap<NodeId, String>,
}
FieldTypeDescription
topicsHashMap<TopicName, TopicState>All topics and their current state
nodesHashMap<NodeId, String>Node ID to Raft/RPC address mappings

TopicState

Comprehensive state for a single topic, including segment history and leadership. Location: metadata.rs:18
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicState {
    pub current_segment: u64,
    pub leader_node: NodeId,
    pub last_sealed_entry_offset: u64,
    pub sealed_segments: HashMap<u64, u64>,
    pub segment_leaders: HashMap<u64, NodeId>,
}

Fields

current_segment
u64
The active segment number where new writes are directed. Starts at 1 when topic is created.
leader_node
NodeId
The node ID currently responsible for handling writes to the current segment.
last_sealed_entry_offset
u64
default:"0"
Cumulative count of entries across all sealed segments. Used for calculating global logical offsets.
sealed_segments
HashMap<u64, u64>
default:"{}"
Maps sealed segment IDs to their entry counts. A segment becomes sealed when rolling over to a new segment.
segment_leaders
HashMap<u64, NodeId>
default:"{}"
Historical record of which node was the leader for each segment. Used to route reads to the correct node.

Type Aliases

pub type NodeId = u64;
pub type TopicName = String;

Metadata Commands

Commands that modify cluster state through Raft consensus. Location: metadata.rs:33
#[derive(Debug, Serialize, Deserialize)]
pub enum MetadataCmd {
    CreateTopic { name: String, initial_leader: NodeId },
    RolloverTopic { name: String, new_leader: NodeId, sealed_segment_entry_count: u64 },
    UpsertNode { node_id: NodeId, addr: String },
}

CreateTopic

Creates a new topic with an initial leader assignment. Fields:
  • name - Topic name (must be unique)
  • initial_leader - Node ID that will lead segment 1
State Changes:
  1. Inserts new TopicState with current_segment = 1
  2. Records initial_leader as both leader_node and in segment_leaders[1]
  3. Returns "CREATED" if successful, "EXISTS" if topic already exists
Location: metadata.rs:125
Topic names are hashed deterministically to select an initial leader, ensuring even distribution across the cluster.

RolloverTopic

Seals the current segment and creates a new one with a new leader. Fields:
  • name - Topic name
  • new_leader - Node ID for the new segment
  • sealed_segment_entry_count - Number of entries in the segment being sealed
State Changes:
  1. Seals current segment by adding to sealed_segments map
  2. Records old leader in segment_leaders for the sealed segment
  3. Increments last_sealed_entry_offset by the sealed count
  4. Increments current_segment
  5. Sets leader_node to new_leader
  6. Records new leader in segment_leaders for new segment
  7. Returns "ROLLED" if successful, error if topic not found
Location: metadata.rs:144
Rollover operations must accurately record the sealed segment’s entry count. This count is critical for cursor advancement during reads.

UpsertNode

Registers a new node or updates an existing node’s address. Fields:
  • node_id - Unique node identifier
  • addr - Network address (host:port or IP:port)
State Changes:
  1. Inserts or updates entry in nodes map
  2. Returns "NODE" on success
Location: metadata.rs:167

Metadata Interface

The Metadata struct wraps ClusterState with thread-safe access. Location: metadata.rs:51
#[derive(Clone)]
pub struct Metadata {
    state: Arc<RwLock<ClusterState>>,
}

Query Methods

get_topic_state

Retrieves the current state of a topic.
pub fn get_topic_state(&self, topic: &str) -> Option<TopicState>
Returns: Some(TopicState) if topic exists, None otherwise Location: metadata.rs:62 Use Cases:
  • Determining current leader for append routing
  • Checking current segment number
  • Accessing segment history for reads

get_node_addr

Looks up the network address for a node ID.
pub fn get_node_addr(&self, node_id: NodeId) -> Option<String>
Returns: Some(addr) if node is registered, None otherwise Location: metadata.rs:67 Use Cases:
  • Resolving target address for RPC forwarding
  • Cluster topology discovery
  • Health checking

all_node_addrs

Returns all registered nodes and their addresses.
pub fn all_node_addrs(&self) -> Vec<(NodeId, String)>
Returns: Vector of (node_id, address) tuples Location: metadata.rs:72 Use Cases:
  • Cluster-wide operations
  • Leader election (deterministic node ordering)
  • Topology snapshots

owned_topics

Finds all topics where the specified node is the current leader.
pub fn owned_topics(&self, node_id: NodeId) -> Vec<(String, u64)>
Returns: Vector of (topic_name, current_segment) tuples Location: metadata.rs:83 Use Cases:
  • Lease management (determining which WAL keys this node should hold)
  • Node responsibility tracking
  • Load balancing analysis

sealed_count

Retrieves the entry count for a sealed segment.
pub fn sealed_count(&self, topic: &str, segment: u64) -> Option<u64>
Returns: Some(count) if segment is sealed, None if not found Location: metadata.rs:97 Use Cases:
  • Read cursor advancement across segments
  • Determining when a segment has been fully consumed
  • Calculating logical offsets
Only sealed (historical) segments have entry counts recorded. The current active segment’s count is tracked separately in NodeController’s offsets map.

segment_leader

Determines which node was the leader for a specific segment.
pub fn segment_leader(&self, topic: &str, segment: u64) -> Option<NodeId>
Returns:
  • Some(node_id) from segment_leaders if recorded
  • Falls back to current leader_node if not found
  • None if topic doesn’t exist
Location: metadata.rs:105 Use Cases:
  • Routing reads to the correct historical segment owner
  • Reconstructing segment ownership history
  • Debugging leadership changes

State Machine Implementation

Metadata implements the StateMachineTrait from Octopii for Raft integration.

apply

Applies a committed Raft log entry to the state machine. Location: metadata.rs:116
fn apply(&self, command: &[u8]) -> Result<Bytes, String>
Process:
  1. Deserializes command bytes into MetadataCmd
  2. Acquires write lock on state
  3. Executes command logic (see MetadataCmd variants above)
  4. Returns success indicator or error message
Called By: Raft layer when log entries are committed by quorum

snapshot

Creates a point-in-time snapshot of the entire cluster state. Location: metadata.rs:174
fn snapshot(&self) -> Vec<u8>
Returns: Bincode-serialized ClusterState Use Cases:
  • Raft snapshot creation for log compaction
  • New node bootstrapping
  • Backup and disaster recovery

restore

Restores state from a snapshot. Location: metadata.rs:179
fn restore(&self, data: &[u8]) -> Result<(), String>
Process:
  1. Deserializes snapshot bytes into ClusterState
  2. Acquires write lock
  3. Replaces entire state with recovered data
Use Cases:
  • Node startup with existing cluster state
  • Raft snapshot installation
  • Disaster recovery

Consistency Guarantees

Strong Consistency

All metadata operations go through Raft consensus:
  • Linearizable reads: Querying metadata reflects all committed updates
  • Atomic updates: Commands are applied atomically in log order
  • Durability: State is replicated across majority before acknowledgment

Read Locks

Query methods use read locks (RwLock::read()), allowing:
  • Multiple concurrent readers
  • No contention between query operations
  • Blocking only on writes (rare)

Write Locks

Modification methods (apply, restore) use write locks, ensuring:
  • Exclusive access during state changes
  • Serialized command application
  • Consistency with Raft log order

Initialization

Location: metadata.rs:56
pub fn new() -> Self {
    Self {
        state: Arc::new(RwLock::new(ClusterState::default())),
    }
}
Creates empty metadata with:
  • No topics
  • No registered nodes
  • Ready for Raft log application or snapshot restore

Thread Safety

  • Clone-friendly: Metadata is Clone via Arc, allowing sharing across async tasks
  • Lock poisoning: Gracefully handles lock poisoning by returning None/errors
  • No deadlocks: Single lock per operation, no nested locking

State Evolution Examples

Topic Creation Flow

Initial State:
  topics: {}
  nodes: {1: "node1:7000", 2: "node2:7000"}

CreateTopic { name: "events", initial_leader: 1 }

Resulting State:
  topics: {
    "events": TopicState {
      current_segment: 1,
      leader_node: 1,
      last_sealed_entry_offset: 0,
      sealed_segments: {},
      segment_leaders: {1: 1}
    }
  }

Segment Rollover Flow

Before Rollover:
  "events": TopicState {
    current_segment: 1,
    leader_node: 1,
    last_sealed_entry_offset: 0,
    sealed_segments: {},
    segment_leaders: {1: 1}
  }

RolloverTopic { 
  name: "events", 
  new_leader: 2, 
  sealed_segment_entry_count: 1000 
}

After Rollover:
  "events": TopicState {
    current_segment: 2,
    leader_node: 2,
    last_sealed_entry_offset: 1000,
    sealed_segments: {1: 1000},
    segment_leaders: {1: 1, 2: 2}
  }

Multiple Rollover Example

After 3 Rollovers:
  "events": TopicState {
    current_segment: 4,
    leader_node: 1,
    last_sealed_entry_offset: 3000,  // 1000 + 1000 + 1000
    sealed_segments: {1: 1000, 2: 1000, 3: 1000},
    segment_leaders: {1: 1, 2: 2, 3: 3, 4: 1}
  }

Integration with NodeController

The Metadata component works closely with NodeController:
  1. Lease Determination: owned_topics() tells NodeController which WAL keys to acquire leases for
  2. Append Routing: get_topic_state() provides leader information for write routing
  3. Read Routing: segment_leader() and sealed_count() enable cursor-based reads across segments
  4. Metadata Updates: NodeController calls propose_metadata() to modify cluster state
  5. Address Resolution: get_node_addr() enables RPC forwarding to remote nodes

Performance Characteristics

  • Read Operations: O(1) hash lookups with read lock contention only
  • Write Operations: O(1) plus Raft consensus latency (typically < 100ms)
  • Memory: Grows with number of topics and segments (historical data retained)
  • Snapshot Size: Linear in number of topics and sealed segments

Monitoring and Debugging

Topic Snapshot

Location: controller/mod.rs:189
pub fn topic_snapshot(&self, topic: &str) -> Result<String>
Returns JSON representation of a topic’s full state for debugging.

Raft Metrics

Access via NodeController::get_metrics() to see:
  • Current leader
  • Log indices
  • Membership configuration
  • Snapshot status

Build docs developers (and LLMs) love