Skip to main content
Apache Druid uses supervisors to manage streaming ingestion from external streaming sources. Supervisors oversee the state of indexing tasks to coordinate handoffs, manage failures, and ensure that scalability and replication requirements are maintained.
This guide uses the Apache Kafka term “offset” to refer to the identifier for records in a partition. For Amazon Kinesis, the equivalent is “sequence number”.

Supervisor Spec

Druid uses a JSON specification (supervisor spec) to define tasks for streaming ingestion or auto-compaction. The supervisor spec specifies how Druid should consume, process, and index data from an external stream.

Top-Level Properties

id
string
The supervisor id. This should be a unique ID. If unspecified, defaults to spec.dataSchema.dataSource.
type
string
required
The supervisor type. For streaming ingestion: kafka, kinesis, or rabbit. For automatic compaction: autocompact.
spec
object
required
The container object for the supervisor configuration.
spec.dataSchema
object
required
The schema for the indexing task to use during ingestion. See dataSchema.
spec.ioConfig
object
required
The I/O configuration object to define connection and I/O-related settings.
spec.tuningConfig
object
The tuning configuration object to define performance-related settings.
context
object
Extra configuration for both the supervisor and the tasks it spawns.
suspended
boolean
Puts the supervisor in a suspended state.

I/O Configuration

The following properties apply to both Kafka and Kinesis ingestion. For source-specific properties, see Kafka I/O configuration and Kinesis I/O configuration.
inputFormat
object
required
The input format to define input data parsing. See input formats.
autoScalerConfig
object
Defines auto scaling behavior for ingestion tasks. See Task autoscaler.
taskCount
integer
default:1
The maximum number of reading tasks in a replica set. Total tasks = taskCount × replicas.
replicas
integer
default:1
The number of replica sets. Druid assigns task replicas to different workers for resiliency.
taskDuration
ISO 8601 period
default:"PT1H"
The length of time before tasks stop reading and begin publishing segments.
startDelay
ISO 8601 period
default:"PT5S"
The period to wait before the supervisor starts managing tasks.
period
ISO 8601 period
default:"PT30S"
How often the supervisor executes its management logic. Specifies the maximum time between iterations.
completionTimeout
ISO 8601 period
default:"PT30M"
How long to wait before declaring a publishing task as failed and terminating it.
lateMessageRejectionStartDateTime
ISO 8601 datetime
Reject messages with timestamps earlier than this datetime. Helps prevent concurrency issues with late messages.
lateMessageRejectionPeriod
ISO 8601 period
Reject messages with timestamps earlier than this period before the task was created.
earlyMessageRejectionPeriod
ISO 8601 period
Reject messages with timestamps later than this period after the task reached its duration.
stopTaskCount
integer
default:"taskCount value"
Limits the number of ingestion tasks that can cycle at any given time. See stopTaskCount.
serverPriorityToReplicas
object
Map of server priorities to the number of replicas per priority. Enables query isolation for mixed workloads.Example: {"1": 2, "0": 1} creates 2 replicas with priority 1 and 1 replica with priority 0.

Task Autoscaler

Optionally configure autoscaling behavior for ingestion tasks using autoScalerConfig.
enableTaskAutoScaler
boolean
default:false
Enables the autoscaler. If not specified, autoscaler is disabled even when autoScalerConfig is set.
taskCountMax
integer
required
Maximum number of ingestion tasks. Must be ≥ taskCountMin.
taskCountMin
integer
required
Minimum number of ingestion tasks. Overrides taskCount when autoscaler is enabled.
taskCountStart
integer
default:"taskCountMin"
Optional number of ingestion tasks to start with.
minTriggerScaleActionFrequencyMillis
long
default:600000
Minimum time interval between two scale actions.
autoScalerStrategy
string
default:"lagBased"
The algorithm of autoscaler. Druid only supports lagBased and costBased (experimental).
stopTaskCountRatio
double
Variable version of ioConfig.stopTaskCount with range (0.0, 1.0]. Makes max stoppable tasks proportional to running tasks.

Lag-Based Autoscaler Strategy

For Kinesis, lag metrics are reported as time difference in milliseconds rather than message count.
lagCollectionIntervalMillis
long
default:30000
Time period during which Druid collects lag metric points.
lagCollectionRangeMillis
long
default:600000
Total time window of lag collection.
scaleOutThreshold
long
default:6000000
Threshold for scale out action.
triggerScaleOutFractionThreshold
double
Enable scale out if this fraction of lag points is higher than scaleOutThreshold.
scaleInThreshold
long
default:1000000
Threshold for scale in action.
triggerScaleInFractionThreshold
double
Enable scale in if this fraction of lag points is lower than scaleOutThreshold.
scaleActionStartDelayMillis
long
default:300000
Delay after supervisor starts before the first scale logic check.
scaleActionPeriodMillis
long
default:60000
Frequency to check if a scale action is triggered.
scaleInStep
integer
default:1
Number of tasks to reduce when scaling down.
scaleOutStep
integer
default:2
Number of tasks to add when scaling out.
lagAggregate
string
default:"SUM"
Aggregate function for lag metric. Options: MAX, SUM, AVERAGE.
{
  "type": "kinesis",
  "spec": {
    "dataSchema": { /* ... */ },
    "ioConfig": {
      "stream": "metrics",
      "autoScalerConfig": {
        "enableTaskAutoScaler": true,
        "taskCountMax": 6,
        "taskCountMin": 2,
        "minTriggerScaleActionFrequencyMillis": 600000,
        "autoScalerStrategy": "lagBased",
        "lagCollectionIntervalMillis": 30000,
        "lagCollectionRangeMillis": 600000,
        "scaleOutThreshold": 600000,
        "triggerScaleOutFractionThreshold": 0.3,
        "scaleInThreshold": 100000,
        "triggerScaleInFractionThreshold": 0.9,
        "scaleActionStartDelayMillis": 300000,
        "scaleActionPeriodMillis": 60000,
        "scaleInStep": 1,
        "scaleOutStep": 2
      },
      "inputFormat": { "type": "json" },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H"
    },
    "tuningConfig": { "type": "kinesis" }
  }
}

Cost-Based Autoscaler Strategy

This autoscaler is experimental. Implementation details and cost function parameters are subject to change. Kinesis is not yet supported.
Computes required task count via cost function based on ingestion lag and poll-to-idle ratio. Task counts are selected from a bounded range derived from the current partitions-per-task ratio.
scaleActionPeriodMillis
long
default:600000
Frequency to check if a scale action is triggered.
lagWeight
double
Weight of extracted lag value in cost function.
idleWeight
double
Weight of extracted poll idle value in cost function.
useTaskCountBoundaries
boolean
default:false
Enables bounded partitions-per-task window when selecting task counts.
highLagThreshold
long
Per-partition lag threshold for burst scale-up. Set to > 0 to enable, < 0 to disable.
minScaleDownDelay
ISO 8601 duration
default:"PT30M"
Minimum duration between successful scale actions.
scaleDownDuringTaskRolloverOnly
boolean
default:false
Limits task scaling down to periods during task rollovers only.

stopTaskCount

Before setting stopTaskCount, note:
  • Operations requiring all tasks to cycle (spec changes, partition changes) can cause lag without sufficient capacity
  • Task autoscaler ignores stopTaskCount when shutting down tasks for task count changes
  • If set below taskCount, Druid cycles longest running tasks first

Tuning Configuration

The following properties apply to both Kafka and Kinesis. For source-specific properties, see Kafka tuning configuration and Kinesis tuning configuration.
type
string
required
The tuning type code: kafka or kinesis.
maxRowsInMemory
integer
default:150000
Number of rows to accumulate before persisting. Represents post-aggregation rows.
maxBytesInMemory
long
default:"1/6 of max JVM memory"
Number of bytes to accumulate in heap memory before persisting. Max heap usage = maxBytesInMemory * (2 + maxPendingPersists).
skipBytesInMemoryOverheadCheck
boolean
default:false
Exclude overhead object bytes from the maxBytesInMemory check.
maxRowsPerSegment
integer
default:5000000
Number of rows to store in a segment (post-aggregation). Handoff occurs when limits are reached.
maxTotalRows
long
default:20000000
Number of rows to aggregate across all segments (post-aggregation).
intermediateHandoffPeriod
ISO 8601 period
default:"P2147483647D"
Period that determines how often tasks hand off segments.
intermediatePersistPeriod
ISO 8601 period
default:"PT10M"
Period that determines the rate at which intermediate persists occur.
maxPendingPersists
integer
default:0
Maximum number of persists that can be pending but not started. Value of 0 means one persist can run concurrently with ingestion.
indexSpec
object
Segment storage format options to use at indexing time. See IndexSpec.
resetOffsetAutomatically
boolean
default:false
Reset partitions when offset is unavailable. If false, surfaces exception causing tasks to fail.
workerThreads
integer
default:"min(10, taskCount)"
Number of threads the supervisor uses to handle requests/responses for worker tasks.
chatRetries
integer
default:8
Number of HTTP request retries to indexing tasks before considering tasks unresponsive.
httpTimeout
ISO 8601 period
default:"PT10S"
Period of time to wait for HTTP response from an indexing task.
shutdownTimeout
ISO 8601 period
default:"PT80S"
Period of time to wait for supervisor to attempt graceful shutdown before exiting.
offsetFetchPeriod
ISO 8601 period
default:"PT30S"
How often supervisor queries streaming source and indexing tasks to fetch current offsets and calculate lag. Minimum value: PT5S.

Start a Supervisor

Start a new supervisor by submitting a supervisor spec via:
  1. Web console: Use the data loader
  2. API: POST to the Supervisor API
The supervisor persists in the configured metadata database. Only one supervisor per datasource is allowed. Submitting a second spec overwrites the previous one.

Schema and Configuration Changes

To make schema or configuration changes:
  1. Submit a new supervisor spec
  2. Overlord initiates graceful shutdown of existing supervisor
  3. Running supervisor signals tasks to stop reading and begin publishing
  4. New supervisor is created with updated configuration
  5. Updated schema is applied while retaining existing publishing tasks
  6. New tasks start at previous task offsets
Configuration changes can be applied without pausing ingestion.

Status Report

The supervisor status report contains the state of supervisor tasks and an array of recent exceptions. View status in the web console:
  1. Navigate to Supervisors view
  2. Click the supervisor ID
  3. Click Status in the left navigation

State Properties

  • state: Generic state applicable to any supervisor type
  • detailedState: Implementation-specific state with more insight
Possible state values: PENDING, RUNNING, SUSPENDED, STOPPING, UNHEALTHY_SUPERVISOR, UNHEALTHY_TASKS

Detailed State Mapping

detailedStatestateDescription
UNHEALTHY_SUPERVISORUNHEALTHY_SUPERVISORSupervisor encountered errors on previous iterations
UNHEALTHY_TASKSUNHEALTHY_TASKSLast tasks all failed
UNABLE_TO_CONNECT_TO_STREAMUNHEALTHY_SUPERVISORConnectivity issues, never connected
LOST_CONTACT_WITH_STREAMUNHEALTHY_SUPERVISORConnectivity issues, previously connected
PENDINGPENDINGInitialized but not started
CONNECTING_TO_STREAMRUNNINGTrying to connect to stream
DISCOVERING_INITIAL_TASKSRUNNINGDiscovering already-running tasks
CREATING_TASKSRUNNINGCreating tasks and discovering state
RUNNINGRUNNINGTasks started, waiting for duration
IDLEIDLENo new data, existing data read
SUSPENDEDSUSPENDEDSupervisor suspended
STOPPINGSTOPPINGSupervisor stopping
{
  "dataSource": "social_media",
  "stream": "social_media",
  "partitions": 1,
  "replicas": 1,
  "durationSeconds": 3600,
  "activeTasks": [
    {
      "id": "index_kafka_social_media_8ff3096f21fe448_jajnddno",
      "startingOffsets": { "0": 0 },
      "startTime": "2024-01-30T21:21:41.696Z",
      "remainingSeconds": 479,
      "type": "ACTIVE",
      "currentOffsets": { "0": 50000 },
      "lag": { "0": 0 }
    }
  ],
  "publishingTasks": [],
  "latestOffsets": { "0": 50000 },
  "minimumLag": { "0": 0 },
  "aggregateLag": 0,
  "offsetsLastUpdated": "2024-01-30T22:13:19.335Z",
  "suspended": false,
  "healthy": true,
  "state": "RUNNING",
  "detailedState": "RUNNING",
  "recentErrors": []
}
For Kafka, consumer lag per partition may be reported as negative if the supervisor hasn’t received the latest offset response. Aggregate lag is always ≥ 0.

SUPERVISORS System Table

Query the sys.supervisors table to retrieve supervisor information:
SELECT * FROM sys.supervisors WHERE healthy=0;
See SUPERVISORS table for more information.

Manage a Supervisor

Manage supervisors via the web console or Supervisor API. In the web console:
  1. Navigate to Supervisors view
  2. Click the ellipsis in the Actions column
  3. Select the desired action

Suspend

Pauses a running supervisor. The suspended supervisor continues to emit logs and metrics. Indexing tasks remain suspended until you resume. API: Suspend a running supervisor

Set Offsets

Perform with caution as it may result in skipped messages and data loss or duplication.
Resets offsets for supervisor partitions. This:
  1. Clears stored offsets
  2. Instructs supervisor to resume from specified offsets
  3. Terminates and recreates active tasks for specified partitions
  4. Saves specified offsets in metadata store if none exist
API: Reset offsets for a supervisor

Hard Reset

Perform with caution as it may result in skipped messages and data loss or duplication.
Clears supervisor metadata, causing data reading to resume from earliest or latest position based on useEarliestOffset. Terminates and recreates active tasks. Use to recover from a stopped state due to missing offsets. API: Reset a supervisor

Terminate

Stops a supervisor and its indexing tasks, triggering segment publishing. Places a tombstone marker in the metadata store to prevent reloading on restart. The terminated supervisor still exists in metadata store and its history can be retrieved. API: Terminate a supervisor

Capacity Planning

Indexing tasks run on Middle Managers and are limited by available resources. Ensure sufficient worker capacity configured via druid.worker.capacity.
Worker capacity is shared across all indexing task types: batch processing, streaming tasks, and merging tasks.
A running task can be in one of two states:
  • Reading: For the period defined in taskDuration
  • Publishing: Until segments are generated, pushed to deep storage, and loaded by Historical services, or until completionTimeout elapses

Minimum Capacity Formula

Number of reading tasks = replicas * taskCount (exception: if taskCount exceeds number of shards/partitions, Druid uses the number of shards/partitions) For reading and publishing tasks to run concurrently:
workerCapacity = 2 * replicas * taskCount
This is for the ideal case with one set publishing while another reads.
If time-to-publish > taskDuration, multiple sets may publish simultaneously, requiring additional capacity.
Set taskDuration large enough that the previous set finishes publishing before the current set begins.

Multi-Supervisor Support

Druid supports multiple stream supervisors ingesting into the same datasource simultaneously. You can have any number of stream supervisors (Kafka, Kinesis, etc.) ingesting into the same datasource.
To ensure proper synchronization with multiple supervisors, set useConcurrentLocks=true in the context field of the supervisor spec.
See Concurrent append and replace for more information.

Supervisor API

Manage and monitor supervisors via API

Kafka Ingestion

Apache Kafka streaming ingestion

Kinesis Ingestion

Amazon Kinesis streaming ingestion

Automatic Compaction

Automatic compaction with supervisors

Build docs developers (and LLMs) love