Worker types
BOOM implements three types of workers:Alert workers
Alert workers
Purpose: Ingest alerts from Kafka, perform crossmatching, and store in MongoDB.Input: Avro-encoded alerts from Redis queue
alerts_{survey}Output:- Alert documents in MongoDB collection
alerts_{survey} - Candidate IDs to Redis queue
classification_{survey}
src/alert/base.rs:852-969Processing model: One alert at a time with temporary queue for reliabilityEnrichment workers
Enrichment workers
Purpose: Run machine learning classifiers and update alerts with classification scores.Input: Candidate IDs from Redis queue
classification_{survey}Output:- Updated classification results in MongoDB
- Object IDs to Redis queue
filter_{survey}
src/enrichment/base.rs:192-282Processing model: Batch processing up to 1000 alerts at onceFilter workers
Filter workers
Purpose: Apply user-defined filters and publish matching alerts to Kafka.Input: Alert identifiers from Redis queue
filter_{survey}Output: Matching alerts published to Kafka topic {filter_name}Implementation: src/filter/base.rs:850-969Processing model: Batch processing up to 1000 alerts per filterWorker traits
Each worker type implements a trait defining its interface:AlertWorker trait
EnrichmentWorker trait
FilterWorker trait
Survey-specific implementations (ZTF, LSST, DECam) are in
src/alert/, src/enrichment/, and src/filter/ directories.Worker lifecycle
Initialization
Workers are spawned by the scheduler:- Loads configuration from
config.yaml - Establishes database and Redis connections
- Enters processing loop
Processing loop
Workers run an infinite loop with command checking:Workers check for termination signals periodically (default: every 500 alerts) rather than on every iteration for efficiency.
Graceful shutdown
When the scheduler receives SIGINT (Ctrl+C):- Scheduler sends termination signals to all workers
- Workers finish processing current alerts
- Workers exit their loops and clean up
- Scheduler waits for all workers to terminate
Worker configuration
Worker counts are configured per survey inconfig.yaml:
Parameters explained
- command_interval: How often workers check for termination signals (in number of processed items)
- n_workers: Number of worker processes for this stage
- Setting
n_workers: 0disables that stage entirely
Worker counts should be tuned based on throughput requirements and available resources. Monitor queue depths to determine if you need more workers.
Worker monitoring
Heartbeat logs
The scheduler logs worker counts every 60 seconds:{live}/{total} workers.
Prometheus metrics
Worker activity is exposed as Prometheus metrics:Active worker gauge
Processing counters
worker.id: UUID of the worker processstatus:okorerrorreason:added,exists,input_queue,processing,output_queue, etc.
Example Prometheus queries
See the README for links to pre-configured Prometheus dashboards.
Worker scaling
Current implementation
Worker counts are currently static - they’re set at startup fromconfig.yaml and don’t change.
To change worker counts:
- Stop the scheduler (Ctrl+C)
- Update
config.yaml - Restart the scheduler
Future: Dynamic scaling
Dynamic worker scaling is planned for future releases. It will:- Monitor queue depths in Redis
- Scale workers up when queues grow
- Scale workers down when queues are empty
- Respect min/max worker bounds
Scaling guidelines
Alert workers: Scale based on Kafka consumption rate- More workers if alerts are backing up in Redis
- Each worker processes one alert at a time
- Typical: 3-5 workers per survey
- More workers if classification queue grows
- Workers process batches of 1000 alerts
- Typical: 3-5 workers, more for expensive models
- More workers if filter queue grows
- Workers run all active filters on each batch
- Typical: 1-3 workers if filters are simple queries
Start with the default worker counts and monitor queue depths. Increase counts gradually if queues consistently grow.
Worker communication
Command channels
Workers receive commands via Tokio MPSC channels:WorkerCmd::Terminate: Graceful shutdown request
Queue naming convention
Redis queues follow a consistent naming pattern:- Alert input:
alerts_{survey}(e.g.,alerts_ztf) - Alert temp:
alerts_{survey}_temp(for reliability) - Classification:
classification_{survey} - Filter:
filter_{survey}
- ZTF:
{programid},{candid}(e.g.,1,2458343.1234567) - LSST:
{object_id}(e.g.,LSSTOBJ123456)
Error handling between stages
Each stage handles errors independently:Failed alerts remain in temporary queues and can be retried by restarting the worker pool.