Skip to main content
The watch API allows you to subscribe to real-time notifications when documents are inserted, updated, or deleted in a collection.

Creating a watcher

Get a watch builder from a collection:
let users = db.collection("users");
let watcher = users.watch();

WatchBuilder methods

filter

Add a filter to only receive events for documents matching a query.
pub fn filter(self, query: &str) -> Self
query
&str
required
Query string to filter change events
Example:
let (handle, rx) = users.watch()
    .filter("age > 18 and city is \"NYC\"")
    .subscribe()?;

subscribe

Subscribe to changes and get a receiver channel.
pub fn subscribe(self) -> Result<(WatchHandle, Receiver<ChangeEvent>)>
Returns: Result<(WatchHandle, Receiver<ChangeEvent>)>
WatchHandle
struct
Handle that automatically unsubscribes when dropped (RAII)
Receiver<ChangeEvent>
mpsc::Receiver
Channel receiver for change events
Example:
let (handle, rx) = users.watch().subscribe()?;

// Use the receiver to get events
while let Ok(event) = rx.recv() {
    println!("Change detected: {:?}", event);
}

// Automatically unsubscribes when handle is dropped

ChangeEvent

Event emitted when a document changes.
collection
String
Name of the collection where the change occurred
operation
ChangeOperation
Type of operation (Insert, Update, or Delete)
doc_id
String
The document ID
document
Option<Value>
The document data (None for Delete operations)

ChangeOperation

Type of change operation.
ChangeOperation::Insert
enum variant
A document was inserted
ChangeOperation::Update
enum variant
A document was updated
ChangeOperation::Delete
enum variant
A document was deleted

WatchHandle

Handle returned from subscribe() that manages the watcher lifecycle.

id

Get the unique watcher ID.
pub fn id(&self) -> &str
Returns: &str - The watcher ID

collection

Get the collection being watched.
pub fn collection(&self) -> &str
Returns: &str - The collection name

unsubscribe

Manually unsubscribe (equivalent to dropping the handle).
pub fn unsubscribe(self)
Example:
let (handle, rx) = users.watch().subscribe()?;

// Later, explicitly unsubscribe
handle.unsubscribe();

Examples

Basic watching

use jasonisnthappy::ChangeOperation;

let users = db.collection("users");
let (handle, rx) = users.watch().subscribe()?;

// Spawn a thread to listen for changes
std::thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        match event.operation {
            ChangeOperation::Insert => {
                println!("New user created: {}", event.doc_id);
                if let Some(doc) = event.document {
                    println!("  Name: {}", doc["name"]);
                }
            }
            ChangeOperation::Update => {
                println!("User updated: {}", event.doc_id);
            }
            ChangeOperation::Delete => {
                println!("User deleted: {}", event.doc_id);
            }
        }
    }
});

// Make changes in main thread
users.insert(json!({"name": "Alice", "age": 30}))?;

// Keep handle alive, or call handle.unsubscribe() to stop

Filtered watching

let users = db.collection("users");

// Only watch for adult users in NYC
let (handle, rx) = users.watch()
    .filter("age >= 18 and city is \"NYC\"")
    .subscribe()?;

std::thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        println!("Adult NYC user changed: {}", event.doc_id);
    }
});

// These will trigger events
users.insert(json!({"name": "Alice", "age": 30, "city": "NYC"}))?;
users.insert(json!({"name": "Bob", "age": 25, "city": "NYC"}))?;

// This will NOT trigger an event (doesn't match filter)
users.insert(json!({"name": "Charlie", "age": 17, "city": "NYC"}))?;
users.insert(json!({"name": "Dave", "age": 30, "city": "LA"}))?;

Multiple watchers

let users = db.collection("users");

// Watcher 1: All changes
let (handle1, rx1) = users.watch().subscribe()?;

// Watcher 2: Only premium users
let (handle2, rx2) = users.watch()
    .filter("plan is \"premium\"")
    .subscribe()?;

// Watcher 3: Only users in specific city
let (handle3, rx3) = users.watch()
    .filter("city is \"San Francisco\"")
    .subscribe()?;

// Each watcher gets events matching their filter
std::thread::spawn(move || {
    while let Ok(event) = rx1.recv() {
        println!("All changes: {}", event.doc_id);
    }
});

std::thread::spawn(move || {
    while let Ok(event) = rx2.recv() {
        println!("Premium user change: {}", event.doc_id);
    }
});

std::thread::spawn(move || {
    while let Ok(event) = rx3.recv() {
        println!("SF user change: {}", event.doc_id);
    }
});

Automatic unsubscribe

{
    let (handle, rx) = users.watch().subscribe()?;
    
    // Watcher is active here
    std::thread::spawn(move || {
        while let Ok(event) = rx.recv() {
            println!("Change: {}", event.doc_id);
        }
    });
    
    // handle is automatically unsubscribed when it goes out of scope
}

// Watcher is now inactive

Explicit unsubscribe

let (handle, rx) = users.watch().subscribe()?;

let thread_handle = std::thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        println!("Change: {}", event.doc_id);
    }
    println!("Watcher closed");
});

// Do some work...
std::thread::sleep(Duration::from_secs(5));

// Explicitly unsubscribe
handle.unsubscribe();

// Thread will exit when channel closes
thread_handle.join().unwrap();

Audit logging

use std::fs::OpenOptions;
use std::io::Write;

let users = db.collection("users");
let (handle, rx) = users.watch().subscribe()?;

std::thread::spawn(move || {
    let mut log_file = OpenOptions::new()
        .create(true)
        .append(true)
        .open("user_changes.log")
        .unwrap();
    
    while let Ok(event) = rx.recv() {
        let timestamp = chrono::Utc::now();
        let log_entry = format!(
            "[{}] {:?}: {} in collection {}\n",
            timestamp,
            event.operation,
            event.doc_id,
            event.collection
        );
        
        log_file.write_all(log_entry.as_bytes()).unwrap();
        log_file.flush().unwrap();
    }
});

Cache invalidation

use std::sync::{Arc, RwLock};
use std::collections::HashMap;

let cache: Arc<RwLock<HashMap<String, Value>>> = Arc::new(RwLock::new(HashMap::new()));
let cache_clone = cache.clone();

let users = db.collection("users");
let (handle, rx) = users.watch().subscribe()?;

std::thread::spawn(move || {
    while let Ok(event) = rx.recv() {
        let mut cache = cache_clone.write().unwrap();
        
        match event.operation {
            ChangeOperation::Insert | ChangeOperation::Update => {
                if let Some(doc) = event.document {
                    cache.insert(event.doc_id, doc);
                }
            }
            ChangeOperation::Delete => {
                cache.remove(&event.doc_id);
            }
        }
    }
});

Notes

  • Watchers are automatically cleaned up when the WatchHandle is dropped
  • Delete operations have document: None in the ChangeEvent
  • Filters on delete operations will not match (no document to filter)
  • Multiple watchers can be active on the same collection
  • Changes are emitted after transaction commit
  • Watchers receive events for all operations, including those in transactions

Build docs developers (and LLMs) love