Skip to main content
Change streams allow you to subscribe to real-time notifications when documents are inserted, updated, or deleted in a collection.

Overview

Change streams use the observer pattern to notify your application of data changes:
  • Insert - New document created
  • Update - Existing document modified
  • Delete - Document removed
You can optionally filter events using queries.

Basic usage

Watch all changes

Subscribe to all changes in a collection.
use jasonisnthappy::Database;
use std::thread;
use std::time::Duration;

let db = Database::open("my.db")?;
let users = db.collection("users");

// Subscribe to all changes
let (handle, rx) = users.watch().subscribe()?;

// Spawn a thread to handle events
let event_thread = thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        println!("Change detected!");
        println!("  Operation: {:?}", event.operation);
        println!("  Document ID: {}", event.doc_id);
        
        if let Some(doc) = event.document {
            println!("  Data: {}", serde_json::to_string_pretty(&doc).unwrap());
        }
    }
});

// Make some changes
users.insert(json!({"name": "Alice", "age": 30}))?;
users.update("name is \"Alice\"", json!({"age": 31}))?;

// Wait for events
thread::sleep(Duration::from_millis(100));

// Cleanup
drop(handle);  // Automatically unsubscribes
event_thread.join().ok();

Watch with filter

Only receive events matching a query.
// Only watch changes to users in NYC
let (handle, rx) = users.watch()
    .filter("city is \"NYC\"")
    .subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        println!("NYC user changed: {}", event.doc_id);
    }
});

// This triggers an event (NYC user)
users.insert(json!({"name": "Alice", "city": "NYC"}))?;

// This does NOT trigger an event (LA user)
users.insert(json!({"name": "Bob", "city": "LA"}))?;

Event types

Insert events

Triggered when a new document is created.
let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        if event.operation == ChangeOperation::Insert {
            println!("New user: {}", event.document.unwrap()["name"]);
        }
    }
});

users.insert(json!({"name": "Alice", "age": 30}))?;
// Output: "New user: Alice"

Update events

Triggered when a document is modified.
use jasonisnthappy::ChangeOperation;

let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        if event.operation == ChangeOperation::Update {
            println!("User {} updated", event.doc_id);
            if let Some(doc) = event.document {
                println!("  New data: {:?}", doc);
            }
        }
    }
});

let id = users.insert(json!({"name": "Alice", "age": 30}))?;
users.update_by_id(&id, json!({"age": 31}))?;
// Output: "User <id> updated"

Delete events

Triggered when a document is removed.
let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        if event.operation == ChangeOperation::Delete {
            println!("User {} deleted", event.doc_id);
            // Note: event.document is None for deletes
        }
    }
});

let id = users.insert(json!({"name": "Alice"}))?;
users.delete_by_id(&id)?;
// Output: "User <id> deleted"

Filtering events

Simple filters

Use the query language to filter events.
// Only watch premium users
let (handle, rx) = users.watch()
    .filter("tier is \"premium\"")
    .subscribe()?;

// Only watch users over 18
let (handle, rx) = users.watch()
    .filter("age >= 18")
    .subscribe()?;

// Complex filters
let (handle, rx) = users.watch()
    .filter("age >= 18 and city is \"NYC\"")
    .subscribe()?;
Delete events with filters are not sent because there’s no document to filter against.

Multiple watchers

You can have multiple watchers with different filters.
// Watch all changes
let (handle1, rx1) = users.watch().subscribe()?;

// Watch only premium users
let (handle2, rx2) = users.watch()
    .filter("tier is \"premium\"")
    .subscribe()?;

// Watch only NYC users
let (handle3, rx3) = users.watch()
    .filter("city is \"NYC\"")
    .subscribe()?;

// All three receive events (if they match their filters)
users.insert(json!({
    "name": "Alice",
    "tier": "premium",
    "city": "NYC"
}))?;
// handle1, handle2, and handle3 all receive this event

Watch handle

The WatchHandle uses RAII to automatically unsubscribe when dropped.

Automatic cleanup

{
    let (handle, rx) = users.watch().subscribe()?;
    // ... use rx ...
}  // handle dropped here - automatically unsubscribes

// No more events sent to rx

Manual unsubscribe

let (handle, rx) = users.watch().subscribe()?;

// ... later ...
handle.unsubscribe();  // Explicitly unsubscribe

Handle metadata

let (handle, rx) = users.watch()
    .filter("age > 18")
    .subscribe()?;

println!("Watcher ID: {}", handle.id());
println!("Watching: {}", handle.collection());

Real-world examples

Audit logging

Log all changes to sensitive collections.
use std::fs::OpenOptions;
use std::io::Write;

let db = Database::open("app.db")?;
let users = db.collection("users");

let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    let mut log_file = OpenOptions::new()
        .create(true)
        .append(true)
        .open("audit.log")
        .unwrap();
    
    while let Ok(event) = rx.recv() {
        let log_entry = format!(
            "[{}] {:?} on document {} in {}\n",
            chrono::Utc::now(),
            event.operation,
            event.doc_id,
            event.collection
        );
        log_file.write_all(log_entry.as_bytes()).ok();
    }
});

Cache invalidation

Invalidate cached data when underlying documents change.
use std::sync::{Arc, Mutex};
use std::collections::HashMap;

let cache: Arc<Mutex<HashMap<String, Value>>> = Arc::new(Mutex::new(HashMap::new()));
let cache_clone = cache.clone();

let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        let mut cache = cache_clone.lock().unwrap();
        
        match event.operation {
            ChangeOperation::Insert | ChangeOperation::Update => {
                if let Some(doc) = event.document {
                    cache.insert(event.doc_id, doc);
                }
            }
            ChangeOperation::Delete => {
                cache.remove(&event.doc_id);
            }
        }
    }
});

Real-time notifications

Send push notifications when data changes.
let (handle, rx) = orders.watch()
    .filter("status is \"completed\"")
    .subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        if let Some(order) = event.document {
            let customer_id = order["customer_id"].as_str().unwrap();
            send_notification(
                customer_id,
                "Your order has been completed!"
            );
        }
    }
});

Data synchronization

Sync changes to another database or service.
let (handle, rx) = products.watch().subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        match event.operation {
            ChangeOperation::Insert | ChangeOperation::Update => {
                if let Some(doc) = event.document {
                    // Sync to Elasticsearch, Redis, etc.
                    sync_to_search_engine(&event.doc_id, &doc);
                }
            }
            ChangeOperation::Delete => {
                // Remove from external service
                remove_from_search_engine(&event.doc_id);
            }
        }
    }
});

Materialized views

Maintain derived data automatically.
let stats: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
let stats_clone = stats.clone();

let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        let mut stats = stats_clone.lock().unwrap();
        
        if let Some(doc) = event.document {
            let city = doc["city"].as_str().unwrap_or("unknown");
            
            match event.operation {
                ChangeOperation::Insert => {
                    *stats.entry(city.to_string()).or_insert(0) += 1;
                }
                ChangeOperation::Delete => {
                    *stats.entry(city.to_string()).or_insert(0) -= 1;
                }
                _ => {}
            }
        }
    }
});

Performance considerations

Use filters to reduce event volume:
// Bad: receive all events, filter in application
let (handle, rx) = users.watch().subscribe()?;
// ... filter events in your code ...

// Good: filter at the source
let (handle, rx) = users.watch()
    .filter("tier is \"premium\"")
    .subscribe()?;
Process events asynchronously:
let (handle, rx) = users.watch().subscribe()?;

// Spawn dedicated thread for event processing
thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        // Process event without blocking
        process_event_async(event);
    }
});
Event delivery guarantees:
  • Events are sent at most once (no retries)
  • If the receiver channel is full or closed, events are dropped
  • No ordering guarantees across multiple watchers
  • Events are sent after the transaction commits

Common patterns

Temporary watcher

// Watch only during a specific operation
{
    let (handle, rx) = users.watch().subscribe()?;
    
    // Perform operation
    perform_bulk_import()?;
    
    // Process events
    while let Ok(event) = rx.try_recv() {
        println!("Imported: {}", event.doc_id);
    }
}  // Automatically unsubscribe

Error handling

let (handle, rx) = users.watch().subscribe()?;

thread::spawn(move || {
    loop {
        match rx.recv() {
            Ok(event) => {
                if let Err(e) = process_event(&event) {
                    eprintln!("Failed to process event: {}", e);
                    // Log, retry, or alert
                }
            }
            Err(_) => {
                // Channel closed - watcher unsubscribed
                break;
            }
        }
    }
});

Multiple collections

let users = db.collection("users");
let orders = db.collection("orders");

let (user_handle, user_rx) = users.watch().subscribe()?;
let (order_handle, order_rx) = orders.watch().subscribe()?;

thread::spawn(move || {
    loop {
        select! {
            recv(user_rx) -> event => {
                if let Ok(event) = event {
                    handle_user_change(event);
                }
            }
            recv(order_rx) -> event => {
                if let Ok(event) = event {
                    handle_order_change(event);
                }
            }
        }
    }
});

Best practices

Keep event handlers fast:Don’t block event processing with slow operations.
thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        // Bad: slow operation blocks event queue
        make_slow_api_call(&event);
        
        // Good: queue for async processing
        event_queue.send(event).ok();
    }
});
Clean up handles:Always ensure watchers are unsubscribed to prevent memory leaks.
let handles = vec![];
handles.push(users.watch().subscribe()?.0);
handles.push(orders.watch().subscribe()?.0);

// Later: clean up all watchers
for handle in handles {
    handle.unsubscribe();
}

Next steps

CRUD operations

Operations that trigger events

Querying

Filter syntax for watchers

Aggregation

Analyze change stream events

Performance

Optimize event processing

Build docs developers (and LLMs) love