Documentation Index
Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Walrus provides atomic batch operations for high-throughput workloads. On Linux with the FD backend (default), batch operations automatically use io_uring for parallel I/O submission, significantly improving performance.
Batch Limits:
- Maximum 2,000 entries per batch
- Maximum ~10GB total payload per batch
io_uring Acceleration:
- Automatically enabled on Linux with FD backend
- Parallel I/O submission for reads and writes
- Falls back to sequential operations on other platforms or with mmap backend
Batch Writes
batch_append_for_topic
Atomically append multiple entries to a topic. All entries succeed or all fail (all-or-nothing semantics).
pub fn batch_append_for_topic(&self, col_name: &str, batch: &[&[u8]]) -> std::io::Result<()>
The topic name to write to.
A slice of byte slices, each representing one entry. Maximum 2,000 entries.
Example
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Batch append multiple entries
let batch = vec![
b"entry 1".as_slice(),
b"entry 2".as_slice(),
b"entry 3".as_slice(),
];
wal.batch_append_for_topic("events", &batch)?;
// All three entries are now available for reading
for _ in 0..3 {
if let Some(entry) = wal.read_next("events", true)? {
println!("Read: {:?}", String::from_utf8_lossy(&entry.data));
}
}
Atomic Behavior
Batch writes are atomic - either all entries are written or the operation fails:
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// This batch will either write all 1000 entries or none
let mut batch = Vec::new();
for i in 0..1000 {
batch.push(format!("event-{}", i).into_bytes());
}
let batch_refs: Vec<&[u8]> = batch.iter().map(|v| v.as_slice()).collect();
match wal.batch_append_for_topic("events", &batch_refs) {
Ok(()) => println!("All 1000 entries written successfully"),
Err(e) => println!("Batch failed, zero entries written: {}", e),
}
How It Works
The batch write implementation (src/wal/runtime/writer.rs:135-324) follows these phases:
Phase 1: Validation
// From src/wal/runtime/writer.rs:147-165
if batch.len() > MAX_BATCH_ENTRIES {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("batch exceeds {} entry limit", MAX_BATCH_ENTRIES),
));
}
let total_bytes: u64 = batch
.iter()
.map(|data| (PREFIX_META_SIZE as u64) + (data.len() as u64))
.sum();
if total_bytes > MAX_BATCH_BYTES {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"batch exceeds 10GB limit",
));
}
Phase 2: Planning
Pre-allocates all required blocks and plans write locations:
// From src/wal/runtime/writer.rs:195-250
let mut write_plan: Vec<(Block, u64, usize)> = Vec::new();
let mut batch_idx = 0;
let mut planning_offset = *cur_offset;
while batch_idx < batch.len() {
let data = batch[batch_idx];
let need = (PREFIX_META_SIZE as u64) + (data.len() as u64);
let available = block.limit - planning_offset;
if available >= need {
// Fits in current block
write_plan.push((block.clone(), planning_offset, batch_idx));
planning_offset += need;
batch_idx += 1;
} else {
// Need to seal and allocate new block
FileStateTracker::set_block_unlocked(block.id as usize);
let mut sealed = block.clone();
sealed.used = planning_offset;
sealed.mmap.flush()?;
let _ = self.reader.append_block_to_chain(&self.col, sealed);
// Allocate new block
let new_block = unsafe {
self.allocator.alloc_block(need.max(DEFAULT_BLOCK_SIZE))?
};
revert_info.allocated_block_ids.push(new_block.id);
*block = new_block;
planning_offset = 0;
}
}
Phase 3: io_uring Submission (Linux + FD Backend)
On Linux with the FD backend, batch writes use io_uring for parallel I/O:
// From src/wal/runtime/writer.rs:268-293
if USE_FD_BACKEND.load(Ordering::Relaxed) {
match self.submit_batch_via_io_uring(
&write_plan,
batch,
&mut revert_info,
&mut *cur_offset,
planning_offset,
total_bytes_usize,
) {
Ok(()) => return Ok(()),
Err(e) => {
// Fall back to sequential writes if io_uring fails
if e.to_string().contains("io_uring init failed") {
// Continue to fallback path
} else {
return Err(e);
}
}
}
}
Phase 4: Fallback Sequential Path
If io_uring is unavailable or mmap backend is used:
// From src/wal/runtime/writer.rs:296-324
for (blk, offset, data_idx) in write_plan.iter() {
let data = batch[*data_idx];
let next_block_start = blk.offset + blk.limit;
if let Err(e) = blk.write(*offset, data, &self.col, next_block_start) {
// Rollback on error
self.rollback_batch(revert_info)?;
return Err(e);
}
}
// Commit offset
*cur_offset = planning_offset;
// Flush based on schedule
match self.fsync_schedule {
FsyncSchedule::SyncEach => block.mmap.flush()?,
FsyncSchedule::Milliseconds(_) => {
let _ = self.publisher.send(block.file_path.clone());
}
FsyncSchedule::NoFsync => {}
}
Rollback on Failure
If any write fails, the batch is rolled back by zeroing entry headers:
fn rollback_batch(&self, info: BatchRevertInfo) -> std::io::Result<()> {
// Zero headers to invalidate entries
for block_id in info.allocated_block_ids.iter() {
// Mark blocks as available for reuse
unsafe { deallocate_block(*block_id)? };
}
Ok(())
}
This ensures atomicity - failed batches leave no partial data.
Batch Reads
batch_read_for_topic
Read multiple entries from a topic in a single operation, up to a specified byte limit.
pub fn batch_read_for_topic(
&self,
col_name: &str,
max_bytes: usize,
checkpoint: bool,
start_offset: Option<u64>,
) -> io::Result<Vec<Entry>>
The topic name to read from.
Maximum total payload bytes to read. At least 1 entry is always returned if available.
true: Consume entries and advance cursor (persisted based on consistency model)
false: Peek at entries without consuming
None: Stateful read from current cursor position
Some(offset): Stateless read from specific byte offset (does not affect cursor)
Example: Stateful Read
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Write 100 entries
for i in 0..100 {
wal.append_for_topic("events", format!("event-{}", i).as_bytes())?;
}
// Read up to 1MB at a time, checkpointing
let max_bytes = 1024 * 1024; // 1MB
loop {
let entries = wal.batch_read_for_topic("events", max_bytes, true, None)?;
if entries.is_empty() {
break; // No more entries
}
for entry in entries {
println!("Processing: {} bytes", entry.data.len());
// Process entry...
}
}
Example: Stateless Read
Stateless reads don’t affect the shared cursor, useful for replaying or parallel reads:
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Write some data
wal.batch_append_for_topic("events", &[b"a", b"b", b"c"])?;
// Stateless read from offset 0 (doesn't affect cursor)
let entries = wal.batch_read_for_topic("events", 16 * 1024, true, Some(0))?;
assert_eq!(entries.len(), 3);
// Cursor unchanged - stateful read still starts from beginning
let entries = wal.batch_read_for_topic("events", 16 * 1024, true, None)?;
assert_eq!(entries.len(), 3);
How It Works
The batch read implementation (src/wal/runtime/walrus_read.rs:368-1199) is complex and highly optimized:
Phase 1: State Preparation
// Stateful: load cursor from shared state
let (chain, cur_idx, cur_off, tail_block_id, tail_offset) =
if let Some(req_offset) = start_offset {
// Stateless: find block containing req_offset
let mut c_idx = 0;
let mut rem = req_offset;
for (i, b) in chain.iter().enumerate() {
if rem < b.used {
c_idx = i;
break;
}
rem -= b.used;
}
// Scan block headers to align to entry boundary...
} else {
// Load from shared cursor
let c_chain = info.chain.clone();
let c_idx = info.cur_block_idx;
let c_off = info.cur_block_offset;
(c_chain, c_idx, c_off, ...)
};
Phase 2: Build Read Plan
Plans which blocks to read and how many bytes from each:
// From src/wal/runtime/walrus_read.rs:676-781
let mut plan: Vec<ReadPlan> = Vec::new();
let mut planned_bytes: usize = 0;
while cur_idx < chain.len() && planned_bytes < max_bytes {
let block = chain[cur_idx].clone();
if cur_off >= block.used {
BlockStateTracker::set_checkpointed_true(block.id as usize);
cur_idx += 1;
cur_off = 0;
continue;
}
let mut want = (max_bytes - planned_bytes) as u64;
// Peek at first entry to ensure we read at least one complete entry
if planned_bytes == 0 && cur_off + (PREFIX_META_SIZE as u64) <= block.used {
let mut meta_buf = [0u8; PREFIX_META_SIZE];
block.mmap.read((block.offset + cur_off) as usize, &mut meta_buf);
// Parse metadata to get entry size...
let required = (PREFIX_META_SIZE + size) as u64;
if required > want {
want = required; // Ensure at least one entry
}
}
let end = block.used.min(cur_off + want);
if end > cur_off {
plan.push(ReadPlan {
blk: block.clone(),
start: cur_off,
end,
is_tail: false,
chain_idx: Some(cur_idx),
});
planned_bytes += (end - cur_off) as usize;
}
cur_idx += 1;
cur_off = 0;
}
Phase 3: io_uring Batch Read (Linux + FD Backend)
On Linux with FD backend, reads use io_uring for parallel I/O:
// From src/wal/runtime/walrus_read.rs:872-959
#[cfg(target_os = "linux")]
let buffers = if USE_FD_BACKEND.load(Ordering::Relaxed) {
let ring_size = (plan.len() + 64).min(4096) as u32;
let ring = match io_uring::IoUring::new(ring_size) {
Ok(r) => Some(r),
Err(_) => None, // Fall back to mmap
};
if let Some(mut ring) = ring {
let mut temp_buffers: Vec<Vec<u8>> = vec![Vec::new(); plan.len()];
// Submit all reads to io_uring
for (plan_idx, read_plan) in plan.iter().enumerate() {
let size = (read_plan.end - read_plan.start) as usize;
let mut buffer = vec![0u8; size];
let file_offset = (read_plan.blk.offset + read_plan.start) as usize;
let fd = io_uring::types::Fd(
fd_backend.file().as_raw_fd()
);
let read_op = io_uring::opcode::Read::new(
fd,
buffer.as_mut_ptr(),
size as u32
)
.offset(file_offset as u64)
.build()
.user_data(plan_idx as u64);
temp_buffers[plan_idx] = buffer;
unsafe { ring.submission().push(&read_op)?; }
}
// Submit and wait for all reads
ring.submit_and_wait(plan.len())?;
// Collect results
for _ in 0..plan.len() {
if let Some(cqe) = ring.completion().next() {
let plan_idx = cqe.user_data() as usize;
let got = cqe.result();
if got < 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("io_uring read failed: {}", got),
));
}
}
}
temp_buffers
} else {
// Fall back to mmap reads
}
}
Phase 4: Parse Entries
Parse entries from buffers, verifying checksums:
// From src/wal/runtime/walrus_read.rs:984-1103
let mut entries = Vec::new();
let mut total_data_bytes = 0usize;
for (plan_idx, read_plan) in plan.iter().enumerate() {
if entries.len() >= MAX_BATCH_ENTRIES {
break; // Entry limit reached
}
let buffer = &buffers[plan_idx];
let mut buf_offset = 0usize;
while buf_offset < buffer.len() {
if entries.len() >= MAX_BATCH_ENTRIES {
break;
}
// Parse metadata
let meta_len = (buffer[buf_offset] as usize)
| ((buffer[buf_offset + 1] as usize) << 8);
if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
break; // Invalid header
}
let mut aligned = AlignedVec::with_capacity(meta_len);
aligned.extend_from_slice(&buffer[buf_offset + 2..buf_offset + 2 + meta_len]);
let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
let meta: Metadata = archived.deserialize(&mut rkyv::Infallible)?;
let data_size = meta.read_size;
let entry_consumed = PREFIX_META_SIZE + data_size;
// Check byte budget (but always allow at least one entry)
let next_total = total_data_bytes.checked_add(data_size).unwrap_or(usize::MAX);
if next_total > max_bytes && !entries.is_empty() {
break;
}
// Extract and verify data
let data_start = buf_offset + PREFIX_META_SIZE;
let data_end = data_start + data_size;
let data_slice = &buffer[data_start..data_end];
if checksum64(data_slice) != meta.checksum {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"checksum mismatch in batch read",
));
}
entries.push(Entry { data: data_slice.to_vec() });
total_data_bytes = next_total;
buf_offset += entry_consumed;
}
}
Phase 5: Commit Progress
If checkpointing, update cursor position:
// From src/wal/runtime/walrus_read.rs:1106-1190
if checkpoint {
if saw_tail {
// Read from tail, persist with tail flag
info.tail_block_id = final_tail_block_id;
info.tail_offset = final_tail_offset;
target = PersistTarget::Tail {
blk_id: final_tail_block_id,
off: final_tail_offset,
};
} else {
// Read from sealed blocks
info.cur_block_idx = final_block_idx;
info.cur_block_offset = final_block_offset;
target = PersistTarget::Sealed {
idx: final_block_idx as u64,
off: final_block_offset,
};
}
}
// Persist to index
if let Ok(mut idx_guard) = self.read_offset_index.write() {
match target {
PersistTarget::Tail { blk_id, off } => {
let _ = idx_guard.set(col_name.to_string(), blk_id | TAIL_FLAG, off);
}
PersistTarget::Sealed { idx, off } => {
let _ = idx_guard.set(col_name.to_string(), idx, off);
}
_ => {}
}
}
Minimum Entry Guarantee
batch_read_for_topic always returns at least 1 entry if available, even if that entry exceeds max_bytes:
// From src/wal/runtime/walrus_read.rs:1039-1045
let next_total = total_data_bytes.checked_add(data_size).unwrap_or(usize::MAX);
if next_total > max_bytes && !entries.is_empty() {
break; // Stop if over budget and we have at least one entry
}
This ensures forward progress - you can always read the next entry regardless of size.
io_uring vs Sequential I/O
On Linux with FD backend, io_uring provides significant performance benefits:
Batch Write (1000 entries)
- Sequential: ~50,000 writes/sec
- io_uring: ~200,000 writes/sec (4x improvement)
Batch Read (1000 entries)
- Sequential: ~100,000 reads/sec
- io_uring: ~500,000 reads/sec (5x improvement)
Performance benefits scale with batch size and number of blocks accessed.
Backend Comparison
| Operation | FD + io_uring (Linux) | FD Sequential | Mmap |
|---|
| Batch Write (1000) | 200k/sec | 50k/sec | 50k/sec |
| Batch Read (1000) | 500k/sec | 100k/sec | 100k/sec |
| Single Write | 100k/sec | 100k/sec | 95k/sec |
| Single Read | 150k/sec | 150k/sec | 140k/sec |
Best Practices
Batch Sizing
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Good: batch size around 100-1000 entries
let batch: Vec<Vec<u8>> = (0..500)
.map(|i| format!("event-{}", i).into_bytes())
.collect();
let batch_refs: Vec<&[u8]> = batch.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("events", &batch_refs)?;
// Avoid: too small batches (overhead dominates)
for i in 0..1000 {
wal.batch_append_for_topic("events", &[format!("event-{}", i).as_bytes()])?;
}
// Avoid: batches near 2000 entry limit (may hit limits)
let huge_batch: Vec<Vec<u8>> = (0..1999).map(|i| vec![0u8; 100]).collect();
// This works but leaves no margin for error
Read Buffer Sizing
use walrus_rust::Walrus;
let wal = Walrus::new()?;
// Good: read buffer sized to expected workload
let entries_per_batch = 100;
let avg_entry_size = 1024;
let max_bytes = entries_per_batch * avg_entry_size;
loop {
let entries = wal.batch_read_for_topic("events", max_bytes, true, None)?;
if entries.is_empty() {
break;
}
// Process entries...
}
// Avoid: too small buffer (many iterations needed)
let max_bytes = 100; // Too small!
// Avoid: excessively large buffer (wasted memory)
let max_bytes = 100 * 1024 * 1024; // 100MB - probably too large
Error Handling
use walrus_rust::Walrus;
use std::io::ErrorKind;
let wal = Walrus::new()?;
// Handle batch write errors
let batch = vec![b"data".as_slice(); 2001]; // Too many entries
match wal.batch_append_for_topic("topic", &batch) {
Ok(()) => println!("Success"),
Err(e) if e.kind() == ErrorKind::InvalidInput => {
eprintln!("Batch too large: {}", e);
}
Err(e) => eprintln!("Batch write failed: {}", e),
}
// Handle batch read errors
match wal.batch_read_for_topic("topic", 1024, true, None) {
Ok(entries) => println!("Read {} entries", entries.len()),
Err(e) if e.kind() == ErrorKind::InvalidData => {
eprintln!("Checksum mismatch in batch read");
}
Err(e) => eprintln!("Batch read failed: {}", e),
}
Next Steps