The watch API allows you to subscribe to real-time notifications when documents are inserted, updated, or deleted in a collection.
Creating a watcher
Get a watch builder from a collection:
let users = db.collection("users");
let watcher = users.watch();
WatchBuilder methods
filter
Add a filter to only receive events for documents matching a query.
pub fn filter(self, query: &str) -> Self
Query string to filter change events
Example:
let (handle, rx) = users.watch()
.filter("age > 18 and city is \"NYC\"")
.subscribe()?;
subscribe
Subscribe to changes and get a receiver channel.
pub fn subscribe(self) -> Result<(WatchHandle, Receiver<ChangeEvent>)>
Returns: Result<(WatchHandle, Receiver<ChangeEvent>)>
Handle that automatically unsubscribes when dropped (RAII)
Channel receiver for change events
Example:
let (handle, rx) = users.watch().subscribe()?;
// Use the receiver to get events
while let Ok(event) = rx.recv() {
println!("Change detected: {:?}", event);
}
// Automatically unsubscribes when handle is dropped
ChangeEvent
Event emitted when a document changes.
Name of the collection where the change occurred
Type of operation (Insert, Update, or Delete)
The document data (None for Delete operations)
ChangeOperation
Type of change operation.
WatchHandle
Handle returned from subscribe() that manages the watcher lifecycle.
Get the unique watcher ID.
Returns: &str - The watcher ID
collection
Get the collection being watched.
pub fn collection(&self) -> &str
Returns: &str - The collection name
unsubscribe
Manually unsubscribe (equivalent to dropping the handle).
Example:
let (handle, rx) = users.watch().subscribe()?;
// Later, explicitly unsubscribe
handle.unsubscribe();
Examples
Basic watching
use jasonisnthappy::ChangeOperation;
let users = db.collection("users");
let (handle, rx) = users.watch().subscribe()?;
// Spawn a thread to listen for changes
std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
match event.operation {
ChangeOperation::Insert => {
println!("New user created: {}", event.doc_id);
if let Some(doc) = event.document {
println!(" Name: {}", doc["name"]);
}
}
ChangeOperation::Update => {
println!("User updated: {}", event.doc_id);
}
ChangeOperation::Delete => {
println!("User deleted: {}", event.doc_id);
}
}
}
});
// Make changes in main thread
users.insert(json!({"name": "Alice", "age": 30}))?;
// Keep handle alive, or call handle.unsubscribe() to stop
Filtered watching
let users = db.collection("users");
// Only watch for adult users in NYC
let (handle, rx) = users.watch()
.filter("age >= 18 and city is \"NYC\"")
.subscribe()?;
std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
println!("Adult NYC user changed: {}", event.doc_id);
}
});
// These will trigger events
users.insert(json!({"name": "Alice", "age": 30, "city": "NYC"}))?;
users.insert(json!({"name": "Bob", "age": 25, "city": "NYC"}))?;
// This will NOT trigger an event (doesn't match filter)
users.insert(json!({"name": "Charlie", "age": 17, "city": "NYC"}))?;
users.insert(json!({"name": "Dave", "age": 30, "city": "LA"}))?;
Multiple watchers
let users = db.collection("users");
// Watcher 1: All changes
let (handle1, rx1) = users.watch().subscribe()?;
// Watcher 2: Only premium users
let (handle2, rx2) = users.watch()
.filter("plan is \"premium\"")
.subscribe()?;
// Watcher 3: Only users in specific city
let (handle3, rx3) = users.watch()
.filter("city is \"San Francisco\"")
.subscribe()?;
// Each watcher gets events matching their filter
std::thread::spawn(move || {
while let Ok(event) = rx1.recv() {
println!("All changes: {}", event.doc_id);
}
});
std::thread::spawn(move || {
while let Ok(event) = rx2.recv() {
println!("Premium user change: {}", event.doc_id);
}
});
std::thread::spawn(move || {
while let Ok(event) = rx3.recv() {
println!("SF user change: {}", event.doc_id);
}
});
Automatic unsubscribe
{
let (handle, rx) = users.watch().subscribe()?;
// Watcher is active here
std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
println!("Change: {}", event.doc_id);
}
});
// handle is automatically unsubscribed when it goes out of scope
}
// Watcher is now inactive
Explicit unsubscribe
let (handle, rx) = users.watch().subscribe()?;
let thread_handle = std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
println!("Change: {}", event.doc_id);
}
println!("Watcher closed");
});
// Do some work...
std::thread::sleep(Duration::from_secs(5));
// Explicitly unsubscribe
handle.unsubscribe();
// Thread will exit when channel closes
thread_handle.join().unwrap();
Audit logging
use std::fs::OpenOptions;
use std::io::Write;
let users = db.collection("users");
let (handle, rx) = users.watch().subscribe()?;
std::thread::spawn(move || {
let mut log_file = OpenOptions::new()
.create(true)
.append(true)
.open("user_changes.log")
.unwrap();
while let Ok(event) = rx.recv() {
let timestamp = chrono::Utc::now();
let log_entry = format!(
"[{}] {:?}: {} in collection {}\n",
timestamp,
event.operation,
event.doc_id,
event.collection
);
log_file.write_all(log_entry.as_bytes()).unwrap();
log_file.flush().unwrap();
}
});
Cache invalidation
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
let cache: Arc<RwLock<HashMap<String, Value>>> = Arc::new(RwLock::new(HashMap::new()));
let cache_clone = cache.clone();
let users = db.collection("users");
let (handle, rx) = users.watch().subscribe()?;
std::thread::spawn(move || {
while let Ok(event) = rx.recv() {
let mut cache = cache_clone.write().unwrap();
match event.operation {
ChangeOperation::Insert | ChangeOperation::Update => {
if let Some(doc) = event.document {
cache.insert(event.doc_id, doc);
}
}
ChangeOperation::Delete => {
cache.remove(&event.doc_id);
}
}
}
});
Notes
- Watchers are automatically cleaned up when the
WatchHandle is dropped
- Delete operations have
document: None in the ChangeEvent
- Filters on delete operations will not match (no document to filter)
- Multiple watchers can be active on the same collection
- Changes are emitted after transaction commit
- Watchers receive events for all operations, including those in transactions