Documentation Index
Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt
Use this file to discover all available pages before exploring further.
Writing Data
append_for_topic
Append a single entry to a topic. This is the fundamental write operation in Walrus.
pub fn append_for_topic(&self, col_name: &str, raw_bytes: &[u8]) -> std::io::Result<()>
The topic name to write to. Topics are created automatically on first write.
The raw byte data to append. Maximum size is limited by block size (10MB).
Example
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Append a simple message
wal.append_for_topic("my-topic", b"Hello, Walrus!")?;
// Append serialized data
let data = serde_json::json!({
"event": "user_signup",
"user_id": 12345,
"timestamp": 1234567890
});
wal.append_for_topic("events", data.to_string().as_bytes())?;
// Append binary data
let binary_data = vec![0x01, 0x02, 0x03, 0x04];
wal.append_for_topic("binary", &binary_data)?;
How It Works
When you call append_for_topic:
- Writer Creation: If this is the first write to the topic, a
Writer is created and allocated an initial 10MB block (src/wal/runtime/walrus.rs:206-238)
- Block Check: The writer checks if the entry fits in the current block (src/wal/runtime/writer.rs:70-78)
- Write Entry: Entry is written with a 256-byte metadata header containing:
- Entry size
- Topic name
- Checksum (FNV-1a 64-bit)
- Next block pointer
- Fsync Handling: Depending on
FsyncSchedule, the write is either:
- Immediately synced to disk (
SyncEach)
- Queued for background fsync (
Milliseconds)
- Not synced (
NoFsync)
Block Sealing
If an entry doesn’t fit in the current block, the block is automatically sealed:
// From src/wal/runtime/writer.rs:70-96
let need = (PREFIX_META_SIZE as u64) + (data.len() as u64);
if *cur + need > block.limit {
// Seal current block
FileStateTracker::set_block_unlocked(block.id as usize);
let mut sealed = block.clone();
sealed.used = *cur;
sealed.mmap.flush()?;
// Append to reader chain for consumption
let _ = self.reader.append_block_to_chain(&self.col, sealed);
// Allocate new block
let new_block = unsafe { self.allocator.alloc_block(need) }?;
*block = new_block;
*cur = 0;
}
Sealed blocks become available for reading immediately.
Reading Data
read_next
Read the next entry from a topic, optionally checkpointing the read position.
pub fn read_next(&self, col_name: &str, checkpoint: bool) -> io::Result<Option<Entry>>
The topic name to read from.
true: Consume the entry and advance the read cursor (persisted based on consistency model)
false: Peek at the entry without consuming it (cursor stays at current position)
Return Value
Returns Option<Entry> where:
Some(Entry) if an entry is available
None if no more entries are available (caught up with writes)
pub struct Entry {
pub data: Vec<u8>,
}
Example: Checkpointing
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Write some data
wal.append_for_topic("events", b"event-1")?;
wal.append_for_topic("events", b"event-2")?;
wal.append_for_topic("events", b"event-3")?;
// Checkpoint=true: consume and advance cursor
if let Some(entry) = wal.read_next("events", true)? {
println!("Consumed: {:?}", String::from_utf8_lossy(&entry.data));
// Cursor is now at event-2
}
// Next read gets event-2
if let Some(entry) = wal.read_next("events", true)? {
println!("Consumed: {:?}", String::from_utf8_lossy(&entry.data));
// Cursor is now at event-3
}
Example: Peeking
use walrus_rust::Walrus;
let wal = Walrus::new()?;
wal.append_for_topic("events", b"event-1")?;
// Checkpoint=false: peek without consuming
if let Some(entry) = wal.read_next("events", false)? {
println!("Peeking: {:?}", String::from_utf8_lossy(&entry.data));
// Cursor still at event-1
}
// Reading again returns the same entry
if let Some(entry) = wal.read_next("events", false)? {
println!("Still peeking: {:?}", String::from_utf8_lossy(&entry.data));
// Still at event-1
}
// Now consume it
if let Some(entry) = wal.read_next("events", true)? {
println!("Consumed: {:?}", String::from_utf8_lossy(&entry.data));
// Cursor advanced past event-1
}
// No more entries
assert!(wal.read_next("events", true)?.is_none());
How It Works
read_next operates in two phases:
1. Sealed Block Path (src/wal/runtime/walrus_read.rs:124-188)
Reads from sealed, immutable blocks in the reader chain:
if info.cur_block_idx < info.chain.len() {
let idx = info.cur_block_idx;
let off = info.cur_block_offset;
let block = info.chain[idx].clone();
if off >= block.used {
// Block exhausted, move to next
BlockStateTracker::set_checkpointed_true(block.id as usize);
info.cur_block_idx += 1;
info.cur_block_offset = 0;
continue;
}
// Read entry from block
match block.read(off) {
Ok((entry, consumed)) => {
let new_off = off + consumed as u64;
if checkpoint {
info.cur_block_offset = new_off;
// Persist position based on consistency model
maybe_persist = if self.should_persist(&mut info, false) {
Some((info.cur_block_idx as u64, new_off))
} else { None };
}
return Ok(Some(entry));
}
Err(_) => return Ok(None),
}
}
2. Tail Path (src/wal/runtime/walrus_read.rs:191-343)
Reads from the active writer block (the “tail”):
// Get active writer's block and offset
let (active_block, written) = writer_arc.snapshot_block()?;
if tail_off < written {
match active_block.read(tail_off) {
Ok((entry, consumed)) => {
let new_off = tail_off + consumed as u64;
if checkpoint {
info.tail_block_id = active_block.id;
info.tail_offset = new_off;
// Persist with tail sentinel flag
maybe_persist = if self.should_persist(&mut info, false) {
Some((tail_block_id | TAIL_FLAG, new_off))
} else { None };
}
return Ok(Some(entry));
}
Err(_) => return Ok(None),
}
}
The tail path uses a special sentinel flag (1u64 << 63) in the persisted block ID to indicate that the position is in the active writer block, not a sealed block.
Persistence Behavior
When checkpoint=true, the read position is persisted according to the consistency model:
StrictlyAtOnce
// Every checkpoint is immediately persisted
let wal = Walrus::with_consistency(ReadConsistency::StrictlyAtOnce)?;
wal.read_next("topic", true)?; // Persists to disk
AtLeastOnce
// Persists every N reads
let wal = Walrus::with_consistency(
ReadConsistency::AtLeastOnce { persist_every: 100 }
)?;
for _ in 0..100 {
wal.read_next("topic", true)?; // Only 100th read persists
}
See Configuration for more details on consistency models.
Entry Metadata
Each entry is stored with a 256-byte metadata header (src/wal/block.rs:12-19):
struct Metadata {
read_size: usize, // Payload size
owned_by: String, // Topic name
next_block_start: u64, // Pointer to next block
checksum: u64, // FNV-1a 64-bit checksum
}
The header format (src/wal/block.rs:63-68):
[2 bytes: metadata length] [metadata bytes] [padding to 256 bytes]
This allows forward compatibility - if metadata structure grows, older readers can skip unknown fields.
Checksum Verification
Every read verifies the data integrity using FNV-1a 64-bit checksums (src/wal/block.rs:116-129):
// Verify checksum
let expected = meta.checksum;
if checksum64(&ret_buffer) != expected {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"checksum mismatch, data corruption detected",
));
}
Corrupted entries return an error instead of silently returning bad data.
Topic Entry Counting
Walrus tracks the number of unconsumed entries per topic in memory (src/wal/runtime/walrus.rs:129-143):
let wal = Walrus::new()?;
wal.append_for_topic("events", b"e1")?;
wal.append_for_topic("events", b"e2")?;
assert_eq!(wal.get_topic_entry_count("events"), 2);
// Checkpoint decrements count
wal.read_next("events", true)?;
assert_eq!(wal.get_topic_entry_count("events"), 1);
// Peek doesn't decrement
wal.read_next("events", false)?;
assert_eq!(wal.get_topic_entry_count("events"), 1);
Counts survive restarts by reconstructing from the persisted read offset and WAL files.
Error Handling
Both operations can return std::io::Error:
use walrus_rust::Walrus;
use std::io::ErrorKind;
let wal = Walrus::new()?;
// Handle write errors
match wal.append_for_topic("topic", b"data") {
Ok(()) => println!("Write successful"),
Err(e) if e.kind() == ErrorKind::OutOfMemory => {
eprintln!("No space available for allocation");
}
Err(e) => eprintln!("Write failed: {}", e),
}
// Handle read errors
match wal.read_next("topic", true) {
Ok(Some(entry)) => println!("Read: {:?}", entry.data),
Ok(None) => println!("No more entries"),
Err(e) if e.kind() == ErrorKind::InvalidData => {
eprintln!("Checksum mismatch - data corruption");
}
Err(e) => eprintln!("Read failed: {}", e),
}
- Batch When Possible: For multiple writes, use
batch_append_for_topic for better throughput
- Adjust Fsync Schedule: Lower fsync frequency increases throughput at the cost of durability
- Use AtLeastOnce: For high-throughput reads,
AtLeastOnce mode reduces persistence overhead
- Peek Strategically: Use
checkpoint=false when you need to inspect data before committing to consumption
Next Steps