Skip to main content
BOOM uses worker pools to parallelize alert processing. Each worker type is specialized for a specific stage of the pipeline, and workers are managed by a central scheduler.

Worker types

BOOM implements three types of workers:
Purpose: Ingest alerts from Kafka, perform crossmatching, and store in MongoDB.Input: Avro-encoded alerts from Redis queue alerts_{survey}Output:
  • Alert documents in MongoDB collection alerts_{survey}
  • Candidate IDs to Redis queue classification_{survey}
Implementation: src/alert/base.rs:852-969Processing model: One alert at a time with temporary queue for reliability
Purpose: Run machine learning classifiers and update alerts with classification scores.Input: Candidate IDs from Redis queue classification_{survey}Output:
  • Updated classification results in MongoDB
  • Object IDs to Redis queue filter_{survey}
Implementation: src/enrichment/base.rs:192-282Processing model: Batch processing up to 1000 alerts at once
Purpose: Apply user-defined filters and publish matching alerts to Kafka.Input: Alert identifiers from Redis queue filter_{survey}Output: Matching alerts published to Kafka topic {filter_name}Implementation: src/filter/base.rs:850-969Processing model: Batch processing up to 1000 alerts per filter

Worker traits

Each worker type implements a trait defining its interface:

AlertWorker trait

#[async_trait::async_trait]
pub trait AlertWorker {
    async fn new(config_path: &str) -> Result<Self, AlertWorkerError>
    where
        Self: Sized;
    
    fn stream_name(&self) -> String;
    fn input_queue_name(&self) -> String;
    fn output_queue_name(&self) -> String;
    
    async fn format_and_insert_alert<T: Serialize + Send + Sync>(
        &self,
        candid: i64,
        alert: &T,
        collection: &mongodb::Collection<T>,
    ) -> Result<ProcessAlertStatus, AlertError>;
    
    async fn process_alert(
        self: &mut Self,
        avro_bytes: &[u8],
    ) -> Result<ProcessAlertStatus, AlertError>;
}

EnrichmentWorker trait

#[async_trait::async_trait]
pub trait EnrichmentWorker {
    async fn new(config_path: &str) -> Result<Self, EnrichmentWorkerError>
    where
        Self: Sized;
    
    fn input_queue_name(&self) -> String;
    fn output_queue_name(&self) -> String;
    
    async fn process_alerts(
        &mut self,
        alerts: &[i64],
    ) -> Result<Vec<String>, EnrichmentWorkerError>;
}

FilterWorker trait

#[async_trait::async_trait]
pub trait FilterWorker {
    async fn new(
        config_path: &str,
        filter_ids: Option<Vec<String>>,
    ) -> Result<Self, FilterWorkerError>
    where
        Self: Sized;
    
    fn input_queue_name(&self) -> String;
    fn output_topic_name(&self) -> String;
    fn has_filters(&self) -> bool;
    fn survey() -> Survey;
    
    async fn process_alerts(
        &mut self,
        alerts: &[String]
    ) -> Result<Vec<Alert>, FilterWorkerError>;
}
Survey-specific implementations (ZTF, LSST, DECam) are in src/alert/, src/enrichment/, and src/filter/ directories.

Worker lifecycle

Initialization

Workers are spawned by the scheduler:
let alert_pool = ThreadPool::new(
    WorkerType::Alert,
    n_alert as usize,
    args.survey.clone(),
    config_path.clone(),
);
Each worker:
  1. Loads configuration from config.yaml
  2. Establishes database and Redis connections
  3. Enters processing loop

Processing loop

Workers run an infinite loop with command checking:
loop {
    // Check for termination signal every N iterations
    if command_check_countdown == 0 {
        if should_terminate(&mut receiver) {
            break;
        }
        command_check_countdown = command_interval;
    }
    command_check_countdown -= 1;
    
    // Process alerts...
}
Workers check for termination signals periodically (default: every 500 alerts) rather than on every iteration for efficiency.

Graceful shutdown

When the scheduler receives SIGINT (Ctrl+C):
  1. Scheduler sends termination signals to all workers
  2. Workers finish processing current alerts
  3. Workers exit their loops and clean up
  4. Scheduler waits for all workers to terminate
// Scheduler shutdown handler
tokio::spawn(
    async {
        tokio::signal::ctrl_c()
            .await
            .expect("failed to listen for ctrl-c event");
        info!("received ctrl-c, sending shutdown signal");
        shutdown_tx.send(())
            .expect("failed to send shutdown signal");
    }
);
Graceful shutdown is still a work in progress. You may see error handling in the logs during shutdown.

Worker configuration

Worker counts are configured per survey in config.yaml:
workers:
  ztf:
    command_interval: 500
    alert:
      n_workers: 3
    enrichment:
      n_workers: 3
    filter:
      n_workers: 3
  lsst:
    command_interval: 500
    alert:
      n_workers: 4
    enrichment:
      n_workers: 3
    filter:
      n_workers: 1
  decam:
    command_interval: 500
    alert:
      n_workers: 1
    enrichment:
      n_workers: 0
    filter:
      n_workers: 1

Parameters explained

  • command_interval: How often workers check for termination signals (in number of processed items)
  • n_workers: Number of worker processes for this stage
  • Setting n_workers: 0 disables that stage entirely
Worker counts should be tuned based on throughput requirements and available resources. Monitor queue depths to determine if you need more workers.

Worker monitoring

Heartbeat logs

The scheduler logs worker counts every 60 seconds:
2024-06-17T12:34:56.789Z INFO scheduler: heartbeat: workers running alert="3/3" enrichment="3/3" filter="3/3"
The format is {live}/{total} workers.
If live worker count is less than total, workers may have crashed. Check logs for errors.

Prometheus metrics

Worker activity is exposed as Prometheus metrics:

Active worker gauge

static ACTIVE: LazyLock<UpDownCounter<i64>> = LazyLock::new(|| {
    SCHEDULER_METER
        .i64_up_down_counter("alert_worker.active")
        .with_unit("{alert}")
        .with_description("Number of alerts currently being processed.")
        .build()
});

Processing counters

static ALERT_PROCESSED: LazyLock<Counter<u64>> = LazyLock::new(|| {
    SCHEDULER_METER
        .u64_counter("alert_worker.alert.processed")
        .with_unit("{alert}")
        .with_description("Number of alerts processed.")
        .build()
});
Metrics include labels:
  • worker.id: UUID of the worker process
  • status: ok or error
  • reason: added, exists, input_queue, processing, output_queue, etc.

Example Prometheus queries

# Total alerts processed
sum(alert_worker_alert_processed_total)

# Processing rate (alerts per second)
irate(alert_worker_alert_processed_total[5m])

# Error rate by reason
sum by (reason) (rate(alert_worker_alert_processed_total{status="error"}[5m]))

# Active alerts being processed
sum(alert_worker_active)
See the README for links to pre-configured Prometheus dashboards.

Worker scaling

Current implementation

Worker counts are currently static - they’re set at startup from config.yaml and don’t change. To change worker counts:
  1. Stop the scheduler (Ctrl+C)
  2. Update config.yaml
  3. Restart the scheduler
Workers will not scale automatically based on load in the current version.

Future: Dynamic scaling

Dynamic worker scaling is planned for future releases. It will:
  • Monitor queue depths in Redis
  • Scale workers up when queues grow
  • Scale workers down when queues are empty
  • Respect min/max worker bounds

Scaling guidelines

Alert workers: Scale based on Kafka consumption rate
  • More workers if alerts are backing up in Redis
  • Each worker processes one alert at a time
  • Typical: 3-5 workers per survey
Enrichment workers: Scale based on model inference time
  • More workers if classification queue grows
  • Workers process batches of 1000 alerts
  • Typical: 3-5 workers, more for expensive models
Filter workers: Scale based on number of filters and complexity
  • More workers if filter queue grows
  • Workers run all active filters on each batch
  • Typical: 1-3 workers if filters are simple queries
Start with the default worker counts and monitor queue depths. Increase counts gradually if queues consistently grow.

Worker communication

Command channels

Workers receive commands via Tokio MPSC channels:
let (tx, rx) = mpsc::channel::<WorkerCmd>(1);
Commands:
  • WorkerCmd::Terminate: Graceful shutdown request

Queue naming convention

Redis queues follow a consistent naming pattern:
  • Alert input: alerts_{survey} (e.g., alerts_ztf)
  • Alert temp: alerts_{survey}_temp (for reliability)
  • Classification: classification_{survey}
  • Filter: filter_{survey}
Filter workers may use survey-specific identifiers for routing:
  • ZTF: {programid},{candid} (e.g., 1,2458343.1234567)
  • LSST: {object_id} (e.g., LSSTOBJ123456)

Error handling between stages

Each stage handles errors independently:
match result {
    Ok(ProcessAlertStatus::Added(candid)) => {
        // Success: queue for next stage
        con.lpush::<&str, i64, isize>(&output_queue, candid).await?;
        con.lrem::<&str, Vec<u8>, isize>(temp_queue, 1, avro_bytes).await?;
    }
    Ok(ProcessAlertStatus::Exists(candid)) => {
        // Already processed: skip
        con.lrem::<&str, Vec<u8>, isize>(temp_queue, 1, avro_bytes).await?;
    }
    Err(error) => {
        // Error: log and skip (alert remains in temp queue)
        warn!("error processing alert: {}", error);
    }
}
Failed alerts remain in temporary queues and can be retried by restarting the worker pool.

Worker best practices

1

Start with defaults

Use the worker counts from config.yaml as a starting point.
2

Monitor queue depths

Watch Redis list lengths to identify bottlenecks:
redis-cli llen alerts_ztf
redis-cli llen classification_ztf
redis-cli llen filter_ztf
3

Check Prometheus metrics

Monitor processing rates and error rates for each worker type.
4

Scale incrementally

Increase worker counts by 1-2 at a time and observe the impact.
5

Balance resources

Ensure adequate database connections, memory, and CPU for worker counts.
Too many workers can overwhelm MongoDB with concurrent connections. The max_pool_size configuration limits database connections.

Build docs developers (and LLMs) love