Skip to main content

Overview

Workers are background processes that consume messages from Kafka topics and perform asynchronous operations. They form the processing backbone of Chronoverse, handling everything from job scheduling to log persistence. All workers are horizontally scalable through Kafka consumer groups, enabling high throughput and fault tolerance.

Worker Architecture

Scheduling Worker

Purpose

Identifies workflows that are due for execution and creates corresponding job records.

Architecture

  • PostgreSQL: Stores workflows and jobs
  • Kafka: Publishes job events
  • Configuration: Poll interval, batch size, fetch limit
SchedulingWorker:
  PollInterval: 10s      # How often to check for due workflows
  FetchLimit: 100        # Max workflows to fetch per poll
  BatchSize: 50          # Jobs to create per transaction
  ContextTimeout: 30s    # Timeout for database operations

Workflow

1

Poll Database

Every poll interval, query PostgreSQL for workflows due for execution:
SELECT * FROM workflows
WHERE terminated_at IS NULL
  AND build_status = 'COMPLETED'
  AND (
    last_execution_time IS NULL
    OR last_execution_time + (interval * INTERVAL '1 minute') <= NOW()
  )
LIMIT {fetch_limit}
2

Create Job Records

For each workflow, create a job entry:
  • Generate unique job ID (UUID)
  • Set scheduled_at to current time
  • Set status to PENDING
  • Set trigger to AUTOMATIC
  • Store in PostgreSQL
3

Publish Events

Send job events to Kafka in batches:
  • Topic: workflows or jobs (depending on workflow type)
  • Partition: Based on workflow ID (ensures ordering)
  • Transactional writes for exactly-once semantics
4

Update Last Execution

Update workflow’s last_execution_time to current time

Scaling Considerations

  • Single Instance: Typically runs as a single instance (no consumer group)
  • Concurrency: Uses database transactions to prevent duplicate job creation
  • Performance: Poll interval and batch size tune throughput
Running multiple Scheduling Workers requires database-level locking to prevent race conditions. Not recommended without proper coordination.

Workflow Worker

Purpose

Prepares Docker image configurations and execution environments for CONTAINER workflows.

Architecture

  • Redis: Caches execution templates
  • ClickHouse: Stores workflow metadata
  • Kafka: Consumes workflow events, publishes job events
  • Docker: Validates images and configurations
  • gRPC Clients: Communicates with Workflows, Jobs, and Notifications services
WorkflowWorker:
  ConsumerGroup: workflow-worker
  ParallelismLimit: 8    # Concurrent workflows to process

Workflow

1

Consume Workflow Event

Read from Kafka’s workflows topic:
{
  "job_id": "uuid",
  "workflow_id": "uuid",
  "scheduled_at": "2024-03-03T10:00:00Z"
}
2

Fetch Workflow Configuration

Call Workflows Service via gRPC:
  • Get workflow details (kind, payload, etc.)
  • Validate build status is COMPLETED
3

Parse Payload

Extract Docker configuration:
{
  "image": "python:3.11-alpine",
  "command": ["python", "-c"],
  "args": ["print('Hello')"],
  "env": {"KEY": "value"}
}
4

Validate Image

  • Check if image exists in Docker
  • Validate image name format
  • Ensure image is accessible
5

Prepare Execution Template

Create execution configuration:
  • Container name: chronoverse-{job_id}
  • Network: chronoverse
  • Environment variables
  • Resource limits
  • Logging configuration
6

Cache Configuration

Store execution template in Redis:
Key: job:{job_id}:config
TTL: 24 hours
Value: JSON execution template
7

Publish Job Event

Send to Kafka’s jobs topic:
{
  "job_id": "uuid",
  "workflow_id": "uuid",
  "execution_ready": true
}
8

Update Job Status

Call Jobs Service to update status to QUEUED

Error Handling

  • Invalid Image: Update workflow build status to FAILED
  • Configuration Error: Log error and fail job with details
  • Service Unavailable: Retry with exponential backoff (circuit breaker)
  • Timeout: Mark job as FAILED and log timeout

Scaling Considerations

  • Consumer Group: Multiple instances share the load
  • Parallelism: Each instance processes multiple workflows concurrently
  • Stateless: No shared state between instances
  • Kafka Partitions: Scale up to the number of topic partitions

Execution Worker

Purpose

Executes scheduled jobs (both HEARTBEAT and CONTAINER types) in isolated environments.

Architecture

  • Redis: Fetches execution templates and streams logs
  • Docker: Executes containers
  • Kafka: Consumes job events, publishes log and analytics events
  • gRPC Clients: Communicates with Workflows, Jobs, and Notifications services
ExecutionWorker:
  ConsumerGroup: execution-worker
  ParallelismLimit: 8    # Concurrent jobs to execute
  DockerHost: unix:///var/run/docker.sock

Workflow

1

Consume Job Event

Read from Kafka’s jobs topic:
{
  "job_id": "uuid",
  "workflow_id": "uuid",
  "user_id": "uuid"
}
2

Fetch Job Details

Call Jobs Service via gRPC to get:
  • Job ID and workflow ID
  • Workflow kind (HEARTBEAT or CONTAINER)
  • Current status
3

Update Status to RUNNING

  • Call Jobs Service to update status
  • Set started_at timestamp
  • Publish notification event
4

Execute Based on Kind

HEARTBEAT:
result := executeHeartbeat()
status := result ? COMPLETED : FAILED
CONTAINER:
  1. Fetch execution config from Redis
  2. Create Docker container
  3. Start container
  4. Stream logs (stdout/stderr)
  5. Wait for completion
  6. Capture exit code
container := createContainer(config)
logs := streamLogs(container)
exitCode := waitForCompletion(container)
status := exitCode == 0 ? COMPLETED : FAILED
5

Process Logs

For each log line:
  1. Add timestamp and sequence number
  2. Publish to Kafka’s job_logs topic
  3. Stream to Redis for real-time viewing
{
  "job_id": "uuid",
  "timestamp": "2024-03-03T10:00:01.123456789Z",
  "message": "Log line",
  "stream": "stdout",
  "sequence_num": 1
}
6

Update Final Status

  • Call Jobs Service with final status
  • Set completed_at timestamp
  • Update workflow consecutive failure count
  • Clean up resources (container, cache)
7

Publish Analytics Event

Send metrics to Kafka’s analytics topic:
{
  "job_id": "uuid",
  "workflow_id": "uuid",
  "status": "COMPLETED",
  "duration_ms": 1234,
  "exit_code": 0
}

Container Lifecycle Management

container := dockerClient.CreateContainer({
  Name: fmt.Sprintf("chronoverse-%s", jobID),
  Image: config.Image,
  Cmd: config.Command,
  Env: config.EnvVars,
  Labels: {
    "chronoverse.job_id": jobID,
    "chronoverse.workflow_id": workflowID,
  },
  HostConfig: {
    NetworkMode: "chronoverse",
    AutoRemove: false,
  },
})
logs := dockerClient.Logs(container, {
  ShowStdout: true,
  ShowStderr: true,
  Follow: true,
  Timestamps: true,
})

for log := range logs {
  publishToKafka(log)
  streamToRedis(log)
}
After job completion:
  1. Stop container (if still running)
  2. Remove container
  3. Delete Redis cache entries
  4. Remove any temporary volumes

Error Handling

  • Container Creation Failed: Log error, mark job as FAILED
  • Container Timeout: Stop container, mark as FAILED
  • Docker Daemon Unavailable: Retry with backoff, eventually fail
  • Log Streaming Error: Continue execution, log error separately

Scaling Considerations

  • Consumer Group: Multiple instances process jobs in parallel
  • Docker Access: Each instance needs Docker socket access
  • Resource Limits: Monitor Docker host resources
  • Network: All containers must access chronoverse network
Scale Execution Workers based on job volume and execution time. More workers = higher concurrent execution capacity.

JobLogs Processor

Purpose

Persists job logs from Kafka to ClickHouse and MeiliSearch for efficient storage and querying.

Architecture

  • Redis: Temporary log storage for running jobs
  • ClickHouse: Long-term log storage
  • MeiliSearch: Full-text search indexing
  • Kafka: Consumes log events
JobLogsProcessor:
  ConsumerGroup: joblogs-processor
  BatchJobLogsSizeLimit: 1000        # Max logs per batch
  BatchJobLogsTimeInterval: 5s       # Max time between flushes
  BatchAnalyticsTimeInterval: 10s    # Analytics batch interval

Workflow

1

Consume Log Events

Read from Kafka’s job_logs topic:
{
  "job_id": "uuid",
  "workflow_id": "uuid",
  "timestamp": "2024-03-03T10:00:01.123456789Z",
  "message": "Application log line",
  "stream": "stdout",
  "sequence_num": 1
}
2

Batch Accumulation

Accumulate logs until:
  • Batch size reaches limit (e.g., 1000 logs)
  • Time interval expires (e.g., 5 seconds)
  • Whichever comes first
3

Write to ClickHouse

Batch insert logs to ClickHouse:
INSERT INTO job_logs (
  job_id, workflow_id, user_id, timestamp,
  message, stream, sequence_num
) VALUES (...)
ClickHouse optimizations:
  • Column-oriented storage for compression
  • Time-based partitioning
  • Asynchronous inserts
4

Index in MeiliSearch

Add logs to search index:
{
  "id": "job_id:sequence_num",
  "job_id": "uuid",
  "message": "Log line",
  "timestamp": "2024-03-03T10:00:01Z",
  "stream": "stdout"
}
5

Commit Kafka Offset

After successful writes, commit offset to Kafka

Batching Strategy

When batch reaches configured size:
  • Reduces write operations
  • Improves throughput
  • May increase latency slightly
When time interval expires:
  • Ensures timely writes
  • Prevents indefinite accumulation
  • Handles low-volume scenarios

Error Handling

  • ClickHouse Unavailable: Retry with backoff, don’t commit offset
  • MeiliSearch Unavailable: Log error, continue (search is non-critical)
  • Batch Write Failed: Retry entire batch, eventually move to dead-letter queue

Scaling Considerations

  • Consumer Group: Multiple instances for parallel processing
  • Partition Assignment: Kafka rebalancing distributes load
  • Write Throughput: ClickHouse batching improves performance
  • Search Indexing: MeiliSearch can handle concurrent writes
JobLogs Processor is optimized for high-volume log ingestion. Scale based on log volume and write latency requirements.

Analytics Processor

Purpose

Consumes job and workflow events to generate analytics data and metrics.

Architecture

  • PostgreSQL: Stores aggregated analytics
  • Kafka: Consumes analytics events
AnalyticsProcessor:
  ConsumerGroup: analytics-processor
  BatchTimeInterval: 10s

Workflow

1

Consume Analytics Events

Read from Kafka’s analytics topic:
{
  "event_type": "job_completed",
  "job_id": "uuid",
  "workflow_id": "uuid",
  "status": "COMPLETED",
  "duration_ms": 1234,
  "timestamp": "2024-03-03T10:00:00Z"
}
2

Process Event

Extract metrics:
  • Job success/failure counts
  • Execution duration statistics
  • Workflow reliability metrics
  • Resource usage patterns
3

Aggregate Data

Update analytics tables in PostgreSQL:
UPDATE workflow_analytics
SET total_jobs = total_jobs + 1,
    total_duration_ms = total_duration_ms + {duration},
    last_execution = {timestamp}
WHERE workflow_id = {workflow_id}
4

Commit Offset

After successful write, commit Kafka offset

Metrics Tracked

Job Metrics

  • Total executions
  • Success rate
  • Failure rate
  • Average duration
  • Duration percentiles (p50, p95, p99)

Workflow Metrics

  • Active workflows count
  • Terminated workflows
  • Average interval
  • Consecutive failures trend

System Metrics

  • Total jobs processed
  • Worker utilization
  • Queue depth
  • Processing latency

User Metrics

  • Workflows per user
  • Jobs per user
  • Resource consumption
  • Activity patterns

Scaling Considerations

  • Consumer Group: Multiple instances for parallel processing
  • Database Writes: Use batching and upserts for efficiency
  • Aggregation: Consider time-windowed aggregations for large datasets

Worker Communication Flow

Complete Job Execution Flow

Monitoring Workers

Health Checks

All workers expose health metrics:
  • Kafka Consumer Lag: How far behind the latest message
  • Processing Rate: Messages processed per second
  • Error Rate: Failed message processing attempts
  • Resource Usage: CPU, memory, disk I/O

Observability

Workers integrate with OpenTelemetry:
  • Traces: Track message processing flow
  • Metrics: Consumer lag, processing time, error counts
  • Logs: Structured logging with context
View worker metrics in Grafana dashboards (LGTM stack) on port 3000.

Worker Best Practices

Configuration Tuning

1

Consumer Group Size

  • Start with 1-2 instances per worker type
  • Scale based on consumer lag metrics
  • Don’t exceed Kafka partition count
2

Parallelism Limits

  • Set based on available CPU cores
  • Monitor resource utilization
  • Account for I/O-bound operations
3

Batch Sizes

  • Larger batches = higher throughput, higher latency
  • Smaller batches = lower latency, more overhead
  • Tune based on workload characteristics

Error Handling

  • Transient Errors: Retry with exponential backoff
  • Permanent Errors: Log and move to dead-letter queue
  • Circuit Breakers: Prevent cascading failures
  • Graceful Degradation: Continue processing other messages

Resource Management

  • Connection Pooling: Reuse database connections
  • Memory Limits: Set container memory limits
  • Garbage Collection: Monitor and tune GC settings
  • Graceful Shutdown: Handle SIGTERM for clean exits

Next Steps

Architecture

Learn about overall system architecture

Workflows

Understand workflow concepts

Jobs

Learn about job execution

Deployment

Deploy and configure workers

Build docs developers (and LLMs) love