Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The NodeController is the core orchestration component in Walrus that glues together metadata (cluster state), bucket storage (Walrus I/O), and RPC surfaces (Kafka facade + internal forwarding). It coordinates all interactions between these systems without parsing Kafka protocol details or managing retention policies. Location: distributed-walrus/src/controller/mod.rs:31

Core Responsibilities

Request Routing

Routes append and read operations to the correct leader nodes based on metadata

Lease Management

Synchronizes write leases with metadata to ensure only leaders can write

Request Forwarding

Forwards requests to remote nodes when the current node is not the leader

Structure

pub struct NodeController {
    pub node_id: NodeId,
    pub bucket: Arc<Storage>,
    pub metadata: Arc<Metadata>,
    pub raft: Arc<OctopiiNode>,
    pub offsets: Arc<RwLock<HashMap<String, u64>>>,
    pub read_cursors: Arc<Mutex<HashMap<String, ReadCursor>>>,
    pub test_fail_forward_read: AtomicBool,
    pub test_fail_monitor: AtomicBool,
    pub test_fail_dir_size: AtomicBool,
}

Fields

FieldTypePurpose
node_idNodeIdUnique identifier for this node in the cluster
bucketArc<Storage>Handle to the underlying Walrus storage engine
metadataArc<Metadata>Cluster metadata including topic/segment/leader mappings
raftArc<OctopiiNode>Raft consensus node for metadata replication
offsetsArc<RwLock<HashMap<String, u64>>>Tracks logical offsets per WAL key
read_cursorsArc<Mutex<HashMap<String, ReadCursor>>>Per-topic read cursors for shared consumption
test_fail_*AtomicBoolFault injection flags for testing

Key Operations

Append Operations

append_for_topic

Appends data to a topic. Routes the request to the current leader node. Location: mod.rs:165
pub async fn append_for_topic(&self, topic: &str, data: Vec<u8>) -> Result<()>
Behavior:
  1. Looks up topic state from metadata to find the current leader and segment
  2. If this node is the leader, performs local append via forward_append
  3. Otherwise, forwards the request to the remote leader via forward_append_remote
  4. Returns error if topic doesn’t exist
Only the designated leader for a topic’s current segment can successfully append. Attempting to append on a non-leader will result in forwarding or error.

forward_append (Internal)

Handles local append operations with lease verification and retry logic. Location: internal.rs:6
pub(crate) async fn forward_append(&self, wal_key: String, data: Vec<u8>) -> InternalResp
Process:
  1. Updates leases to ensure this node has write permission
  2. Attempts append with retry via append_with_retry (2 attempts)
  3. Records the append in offset tracking
  4. Checks if segment rollover is needed via maybe_rollover
  5. Returns InternalResp::Ok or InternalResp::Error

append_with_retry

Retry logic for appends with lease synchronization. Location: mod.rs:540
async fn append_with_retry(&self, wal_key: &str, data: Vec<u8>) -> Result<()>
Makes up to 2 attempts to append, refreshing leases between attempts if needed.

Read Operations

read_one_for_topic

Reads a single entry for a topic using the provided cursor, automatically advancing across sealed segments. Location: mod.rs:199
pub async fn read_one_for_topic(
    &self,
    topic: &str,
    cursor: &mut ReadCursor,
) -> Result<Option<Vec<u8>>>
Cursor Management:
  • Initializes cursor to segment 1 if unset
  • Advances to next segment when current segment is fully consumed
  • Uses metadata’s sealed_count to determine when segments are complete
  • Routes reads to appropriate leader (current or historical segment leader)
Returns:
  • Ok(Some(data)) - Successfully read an entry
  • Ok(None) - No data available (reached end of current segment)
  • Err(_) - Topic doesn’t exist or read error

read_one_for_topic_shared

Shared read path that maintains a per-topic cursor across client connections. Location: mod.rs:271
pub async fn read_one_for_topic_shared(&self, topic: &str) -> Result<Option<Vec<u8>>>
Uses a shared cursor stored in read_cursors to enable consumer groups or shared consumption patterns.

forward_read (Internal)

Performs local read from storage. Location: mod.rs:304
async fn forward_read(&self, wal_key: &str, _max_entries: usize) -> InternalResp
Returns InternalResp::ReadResult with data and high watermark (tracked entry count).

Topic Management

ensure_topic

Creates a topic if it doesn’t already exist, selecting an initial leader. Location: mod.rs:124
pub async fn ensure_topic(&self, topic: &str) -> Result<()>
Leader Selection:
  • If no nodes exist, uses current node as leader
  • Otherwise, hashes topic name to deterministically select a node
  • Proposes MetadataCmd::CreateTopic to Raft
  • Refreshes leases after creation

maybe_rollover (Internal)

Checks if a segment has reached capacity and triggers rollover to a new segment. Location: mod.rs:457
async fn maybe_rollover(&self, topic: &str, segment: u64) -> Result<()>
Process:
  1. Checks if tracked entry count exceeds max_segment_entries() threshold
  2. Retrieves current Raft membership to select next leader
  3. Round-robin leader selection among cluster members
  4. Proposes MetadataCmd::RolloverTopic with sealed segment count
Segment rollover is triggered automatically after appends. The sealed segment count is recorded in metadata for cursor advancement during reads.

Lease Management

update_leases

Synchronizes storage leases with current metadata state. Location: mod.rs:91
pub async fn update_leases(&self)
Process:
  1. Queries metadata for all topics where node_id is the leader
  2. Generates set of expected WAL keys (t_<topic>_s_<segment>)
  3. Syncs peer addresses from metadata to Raft layer
  4. Updates bucket storage with new lease set
Called by:
  • Lease update loop (every 100ms)
  • After metadata changes
  • During retry operations

run_lease_update_loop

Background task that periodically refreshes leases. Location: mod.rs:284
pub async fn run_lease_update_loop(self: Arc<Self>)
Runs indefinitely with 100ms intervals, calling update_leases().

Request Forwarding

forward_append_remote

Forwards append requests to remote leader nodes. Location: mod.rs:321
async fn forward_append_remote(
    &self,
    leader_node: NodeId,
    wal_key: String,
    data: Vec<u8>,
) -> Result<()>
Process:
  1. Resolves leader node address from metadata
  2. Serializes InternalOp::ForwardAppend payload
  3. Sends RPC request via Raft handler with 15s timeout
  4. Deserializes response and returns result

forward_read_remote

Forwards read requests to remote nodes. Location: mod.rs:494
async fn forward_read_remote(
    &self,
    leader_node: NodeId,
    wal_key: String,
) -> Result<Vec<Vec<u8>>>
Similar to append forwarding but for read operations, returning the data payload.

Metadata Operations

propose_metadata

Proposes metadata changes through Raft consensus. Location: mod.rs:373
pub(crate) async fn propose_metadata(&self, cmd: MetadataCmd) -> Result<()>
Leadership Handling:
  • If this node is Raft leader, proposes directly with 10s timeout
  • Otherwise, waits up to 5s to discover current leader
  • Forwards proposal to remote leader if necessary
  • Retries with 100ms intervals if leader is unknown

upsert_node

Registers or updates a node in cluster metadata. Location: mod.rs:158
pub async fn upsert_node(&self, node_id: NodeId, addr: String) -> Result<()>
Proposes MetadataCmd::UpsertNode to Raft.

handle_join_cluster

Handles cluster join requests from new nodes. Location: mod.rs:567
async fn handle_join_cluster(&self, node_id: u64, addr: String) -> InternalResp
Process:
  1. Resolves node address (with DNS lookup fallback)
  2. Adds node as Raft learner
  3. Records node in metadata via upsert_node
  4. Spawns background task to promote learner to voter once caught up (60s timeout, 120 attempts)

RPC Handler

handle_rpc

Main RPC dispatch handler for internal operations. Location: mod.rs:44
pub async fn handle_rpc(&self, op: InternalOp) -> InternalResp
Supported Operations:
  • ForwardAppend - Handles forwarded append requests
  • ForwardRead - Handles forwarded read requests
  • ForwardMetadata - Handles forwarded metadata proposals
  • JoinCluster - Handles cluster join requests
  • TestControl - Fault injection for testing

Helper Functions

wal_key

Generates a WAL key from topic name and segment number. Location: types.rs:3
pub fn wal_key(topic: &str, segment: u64) -> String
Format: t_<topic>_s_<segment> (e.g., t_events_s_1)

parse_wal_key

Parses a WAL key back into topic and segment. Location: types.rs:8
pub fn parse_wal_key(wal_key: &str) -> Option<(String, u64)>
Example: t_events_s_1("events", 1)

Read Cursor

Location: types.rs:19
#[derive(Default, Clone, Debug)]
pub struct ReadCursor {
    pub segment: u64,
    pub delivered_in_segment: u64,
}
Tracks read position within a topic:
  • segment - Current segment being read
  • delivered_in_segment - Number of entries delivered from this segment

Lifecycle

  1. Initialization - Controller is created with node ID, storage, metadata, and Raft instances
  2. Lease Loop - Background task continuously syncs leases with metadata
  3. Request Processing - Handles Kafka API calls and internal RPC operations
  4. Leader Changes - Automatically adapts to leadership changes via metadata updates
  5. Segment Rollover - Automatically creates new segments when capacity reached

Thread Safety

All shared state is protected by:
  • RwLock for offsets (read-heavy, write-occasional)
  • Mutex for read cursors (exclusive access needed)
  • Atomic bools for test flags (lock-free)
  • Arc for shared ownership across async tasks

Error Handling

Returned when attempting to append to a WAL key without a valid lease. Clients should retry against the correct leader.
Metadata proposals timeout after 10 seconds. Indicates Raft consensus issues or cluster unavailability.
Returned when operating on a topic that hasn’t been created. Use ensure_topic first.
Remote forwarding can fail due to network issues, leader unavailability, or lease mismatches. Includes 15s RPC timeout.

Testing Hooks

The controller includes fault injection capabilities for testing:
  • test_fail_forward_read - Forces read forwarding to fail
  • test_fail_monitor - Forces monitoring operations to fail
  • test_fail_dir_size - Forces directory size checks to fail
These are controlled via TestControl RPC operations.

Build docs developers (and LLMs) love