Pipeline architecture
BOOM’s pipeline consists of three worker types, each handling a specific task:Stage 1: Kafka consumer
The Kafka consumer is the entry point for alerts into the BOOM pipeline.What it does
- Reads alerts from survey Kafka topics (ZTF, LSST, DECam)
- Deserializes Avro-encoded alert packets
- Transfers alerts to Valkey (Redis-compatible) in-memory queues
- Manages backpressure with configurable queue limits
Configuration
Kafka connection settings are defined inconfig.yaml:
config.yaml
The consumer uses a unique
instance_id UUID to distinguish metrics from different instances.Metrics
The consumer exports Prometheus metrics:kafka_consumer_alert_processed_total: Total alerts consumed- Throughput:
irate(kafka_consumer_alert_processed_total[5m])
Stage 2: Alert ingestion workers
Alert workers read from Valkey queues and prepare alerts for database storage.Responsibilities
- Format conversion: Convert alerts to BSON documents for MongoDB
- Crossmatching: Query archival catalogs for sources near the alert position
- Database write: Store formatted alerts in MongoDB collections
- Queue for enrichment: Add alerts to enrichment processing queue
Crossmatching catalogs
Crossmatches are configured per survey inconfig.yaml:
config.yaml
Metrics
Alert workers track:alert_worker_active: Alerts currently being processedalert_worker_alert_processed_total: Total alerts processed- Processing rate:
irate(alert_worker_alert_processed_total[5m])
Stage 3: Enrichment workers
Enrichment workers add scientific context by running classification models.Classification pipeline
Run classifiers
Execute machine learning models to classify alerts by type:
- Supernovae
- Active galactic nuclei
- Variable stars
- Asteroids
- Artifacts
Batch processing
Enrichment workers process multiple alerts simultaneously to:- Amortize model loading overhead
- Maximize GPU utilization
- Improve throughput
Metrics
Enrichment workers export:enrichment_worker_active: Alerts currently being enrichedenrichment_worker_batch_processed_total: Total batches processedenrichment_worker_alert_processed_total: Total alerts enriched- Batch size:
irate(enrichment_worker_alert_processed_total[5m]) / irate(enrichment_worker_batch_processed_total[5m])
Stage 4: Filter workers
Filter workers execute user-defined MongoDB aggregation pipelines to identify interesting alerts.How filters work
- Load filters: Read active filter definitions from the
filtersMongoDB collection - Build pipeline: Construct MongoDB aggregation pipeline with:
- Auxiliary data lookups (previous photometry, forced photometry)
- User-defined filter stages
- Permission checks (for ZTF data rights)
- Execute filters: Run aggregation pipeline against alert collections
- Send results: Publish matching alerts to Kafka output topics
Filter structure
Each filter consists of:Filters are survey-specific. A filter created for ZTF won’t run on LSST alerts.
Output format
Alerts that pass filters are serialized to Avro format and sent to Kafka:Metrics
Filter workers track:filter_worker_active: Alerts currently being filteredfilter_worker_batch_processed_total: Total filter batches executedfilter_worker_alert_processed_total: Total alerts that passed filters (labeled byreason:passedorfailed)
Worker scaling
The number of workers for each stage is configured inconfig.yaml:
config.yaml
Database collections
BOOM uses MongoDB collections organized by survey:ZTF collections
ZTF_alerts: Primary alert documents with candidate dataZTF_alerts_aux: Auxiliary data (previous candidates, forced photometry)ZTF_alerts_cutouts: Image cutouts (science, template, difference)
LSST collections
LSST_alerts: Primary alert documentsLSST_alerts_aux: Auxiliary data including previous DIASources
Shared collections
filters: User-defined filter definitions- Catalog collections:
PS1_DR1,Gaia_DR3,NED, etc.
Performance considerations
Batch size tuning
Batch size tuning
Enrichment and filter workers process alerts in batches. Larger batches improve throughput but increase latency. Adjust batch sizes based on your latency requirements.
MongoDB indexing
MongoDB indexing
BOOM automatically creates indexes for:
- Alert IDs (
_id) - Object IDs (
objectId) - Timestamps (
candidate.jd) - Sky coordinates for spatial queries
Crossmatch optimization
Crossmatch optimization
Use the
projection field to limit crossmatch results to only needed fields. Set appropriate radius values to avoid retrieving too many candidates.Memory management
Memory management
Valkey queue size (
--max-in-queue) limits memory usage. If consumers are faster than workers, reduce this value to prevent memory exhaustion.Next steps
Creating filters
Write MongoDB aggregation pipelines to identify interesting alerts
Monitoring
Track pipeline performance with Prometheus metrics