Overview
Distributed mode separates Lucille components into independent processes that communicate via Kafka. This architecture enables:- Horizontal scaling - Add more Workers or Indexers to increase throughput
- Fault tolerance - Kafka provides durable message storage and replay
- Independent scaling - Scale each component based on its bottleneck
- High throughput - Process millions of documents efficiently
- Production reliability - Suitable for 24/7 production workloads
Distributed mode requires a running Kafka cluster. Workers and Indexers must be started before the Runner.
Architecture
Components run in separate JVMs and communicate through Kafka topics:Kafka Topics
Distributed mode uses three Kafka topics per pipeline:| Topic | Producer | Consumer | Purpose |
|---|---|---|---|
{pipeline}_source | Runner (Connector) | Worker(s) | Unprocessed documents |
{pipeline}_destination | Worker(s) | Indexer | Processed documents |
{pipeline}_events | Worker, Indexer | Runner (Publisher) | Lifecycle events |
Configuration
kafka {
bootstrapServers: "kafka-broker1:9092,kafka-broker2:9092"
# Consumer settings
consumerGroupId: "lucille_workers"
maxPollIntervalSecs: 600
# Producer settings
maxRequestSize: 250000000 # 250MB for large documents
# Serialization
documentDeserializer: "com.kmwllc.lucille.message.KafkaDocumentDeserializer"
documentSerializer: "com.kmwllc.lucille.message.KafkaDocumentSerializer"
# Event tracking
events: true
}
connectors: [
{
class: "com.kmwllc.lucille.connector.FileConnector"
name: "file_connector"
pipeline: "processing_pipeline"
paths: ["s3://bucket/data/*.csv"]
fileHandlers: { csv: {} }
}
]
pipelines: [
{
name: "processing_pipeline"
stages: [
{
class: "com.kmwllc.lucille.stage.ValidateFields"
required: ["id", "title"]
}
]
}
]
indexer {
type: "OpenSearch"
batchSize: 500
batchTimeout: 5000
}
opensearch {
url: "https://opensearch-cluster:9200"
index: "documents"
username: "admin"
password: "${OPENSEARCH_PASSWORD}"
}
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Worker \
processing_pipeline
# Terminal 1
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Worker processing_pipeline
# Terminal 2
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Worker processing_pipeline
# Terminal 3...
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Indexer \
processing_pipeline
# Terminal 1
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Indexer processing_pipeline
# Terminal 2
java -Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Indexer processing_pipeline
java \
-Dconfig.file=application.conf \
-cp 'lucille-core/target/lucille.jar:lucille-core/target/lib/*' \
com.kmwllc.lucille.core.Runner \
-useKafka
Scaling Strategies
Scale Workers (Pipeline Processing)
Symptom: Destination topic lag is low, but source topic lag is growing. Solution: Add more Worker processes.- Start additional Worker JVMs on the same machine
- Deploy Workers on additional machines
- Increase
worker.numThreadsin existing Workers
Scale Indexers (Backend Writes)
Symptom: Destination topic lag is growing. Solution: Add more Indexer processes or tune batch settings.Scale Connector (Source Reads)
Symptom: Runner finishes quickly, but overall throughput is low. Solution: Increase connector parallelism or tune source connector settings.Kafka Configuration Deep Dive
Consumer Settings
Producer Settings
Event Topic Configuration
Hybrid Mode: Kafka Local
For testing distributed behavior without deploying multiple processes:- Launches Worker and Indexer as threads (like local mode)
- Uses Kafka for messaging (like distributed mode)
- Useful for integration testing Kafka configurations
Monitoring and Observability
Log Aggregation
Centralize logs from all processes:Metrics Collection
Lucille emits metrics via Dropwizard Metrics. Integrate with monitoring systems:Kafka Monitoring
Monitor topic lag and throughput:Fault Tolerance and Recovery
Kafka Offset Management
Workers commit offsets only after successful processing:If a Worker crashes, uncommitted offsets allow another Worker to reprocess those documents from Kafka.
Indexer Failure Handling
The Indexer sends failure events for documents that cannot be indexed:Graceful Shutdown
All components handleSIGINT gracefully:
Worker Shutdown:
Deployment Patterns
Containerized Deployment (Docker/Kubernetes)
Systemd Services (Bare Metal/VMs)
Best Practices
- Performance
- Reliability
- Operations
- Partition topics with sufficient partitions for parallelism (e.g., 10-20 partitions)
- Tune batch sizes to balance latency vs throughput
- Use compression on Kafka producers (gzip or snappy)
- Monitor JVM heap - allocate enough memory for worker threads and batches
- Benchmark pipelines before production to identify bottlenecks
Troubleshooting
Workers Not Consuming Messages
Symptom: Source topic has messages, but workers are idle. Causes:- Consumer group rebalancing
- Kafka connectivity issues
- Incorrect pipeline name
High Memory Usage
Symptom: Workers or Indexers run out of heap memory. Causes:- Large documents in memory
- Too many worker threads
- Indexer batches too large
Next Steps
Production Best Practices
Learn monitoring, performance tuning, and troubleshooting for production
Kafka Configuration
Deep dive into Kafka configuration options