// Only watch changes to users in NYClet (handle, rx) = users.watch() .filter("city is \"NYC\"") .subscribe()?;thread::spawn(move || { while let Ok(event) = rx.recv() { println!("NYC user changed: {}", event.doc_id); }});// This triggers an event (NYC user)users.insert(json!({"name": "Alice", "city": "NYC"}))?;// This does NOT trigger an event (LA user)users.insert(json!({"name": "Bob", "city": "LA"}))?;
You can have multiple watchers with different filters.
// Watch all changeslet (handle1, rx1) = users.watch().subscribe()?;// Watch only premium userslet (handle2, rx2) = users.watch() .filter("tier is \"premium\"") .subscribe()?;// Watch only NYC userslet (handle3, rx3) = users.watch() .filter("city is \"NYC\"") .subscribe()?;// All three receive events (if they match their filters)users.insert(json!({ "name": "Alice", "tier": "premium", "city": "NYC"}))?;// handle1, handle2, and handle3 all receive this event
let (handle, rx) = orders.watch() .filter("status is \"completed\"") .subscribe()?;thread::spawn(move || { while let Ok(event) = rx.recv() { if let Some(order) = event.document { let customer_id = order["customer_id"].as_str().unwrap(); send_notification( customer_id, "Your order has been completed!" ); } }});
let (handle, rx) = products.watch().subscribe()?;thread::spawn(move || { while let Ok(event) = rx.recv() { match event.operation { ChangeOperation::Insert | ChangeOperation::Update => { if let Some(doc) = event.document { // Sync to Elasticsearch, Redis, etc. sync_to_search_engine(&event.doc_id, &doc); } } ChangeOperation::Delete => { // Remove from external service remove_from_search_engine(&event.doc_id); } } }});
// Bad: receive all events, filter in applicationlet (handle, rx) = users.watch().subscribe()?;// ... filter events in your code ...// Good: filter at the sourcelet (handle, rx) = users.watch() .filter("tier is \"premium\"") .subscribe()?;
Process events asynchronously:
let (handle, rx) = users.watch().subscribe()?;// Spawn dedicated thread for event processingthread::spawn(move || { while let Ok(event) = rx.recv() { // Process event without blocking process_event_async(event); }});
Event delivery guarantees:
Events are sent at most once (no retries)
If the receiver channel is full or closed, events are dropped
// Watch only during a specific operation{ let (handle, rx) = users.watch().subscribe()?; // Perform operation perform_bulk_import()?; // Process events while let Ok(event) = rx.try_recv() { println!("Imported: {}", event.doc_id); }} // Automatically unsubscribe
Keep event handlers fast:Don’t block event processing with slow operations.
thread::spawn(move || { while let Ok(event) = rx.recv() { // Bad: slow operation blocks event queue make_slow_api_call(&event); // Good: queue for async processing event_queue.send(event).ok(); }});
Clean up handles:Always ensure watchers are unsubscribed to prevent memory leaks.
let handles = vec![];handles.push(users.watch().subscribe()?.0);handles.push(orders.watch().subscribe()?.0);// Later: clean up all watchersfor handle in handles { handle.unsubscribe();}