Skip to main content
When you enable the Kafka indexing service, you can configure supervisors on the Overlord to manage the creation and lifetime of Kafka indexing tasks. Kafka indexing tasks read events using Kafka partition and offset mechanism to guarantee exactly-once ingestion.
You must be on Apache Kafka version 0.11.x or higher. For older versions, refer to the Apache Kafka upgrade guide.

Setup

To use the Kafka indexing service, load the druid-kafka-indexing-service extension on both the Overlord and Middle Manager. See Loading extensions for more information.

Supervisor Spec Configuration

This section covers Kafka-specific configuration properties. For properties shared across all streaming methods, see Supervisor spec.
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "metrics-kafka",
      "timestampSpec": {
        "column": "timestamp",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [],
        "dimensionExclusions": [
          "timestamp",
          "value"
        ]
      },
      "metricsSpec": [
        {
          "name": "count",
          "type": "count"
        },
        {
          "name": "value_sum",
          "fieldName": "value",
          "type": "doubleSum"
        },
        {
          "name": "value_min",
          "fieldName": "value",
          "type": "doubleMin"
        },
        {
          "name": "value_max",
          "fieldName": "value",
          "type": "doubleMax"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "NONE"
      }
    },
    "ioConfig": {
      "topic": "metrics",
      "inputFormat": {
        "type": "json"
      },
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H"
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsPerSegment": 5000000
    }
  }
}

I/O Configuration

Kafka-specific ioConfig properties:
topic
string
required
The Kafka topic to read from. Once set, this value cannot be updated. To ingest from multiple topics, use topicPattern instead.
topicPattern
string
Multiple Kafka topics to read from, passed as a regex pattern. See Ingest from multiple topics.
consumerProperties
object
required
A map of properties to pass to the Kafka consumer. Must include bootstrap.servers at minimum.
pollTimeout
long
default:100
The length of time to wait for the Kafka consumer to poll records, in milliseconds.
useEarliestOffset
boolean
default:false
If a supervisor manages a datasource for the first time, this determines whether to retrieve the earliest or latest offsets in Kafka.
idleConfig
object
Defines how and when the Kafka supervisor can become idle. See Idle configuration.

Ingest from Multiple Topics

Downgrading to a version older than 28.0.0 will cause ingestion to fail for datasources using multi-topic ingestion.
Migrating an existing supervisor from topic to topicPattern is not supported. You must suspend, reset offsets, and resubmit the supervisor.
To ingest from multiple topics, use the topicPattern property with a regex pattern:
{
  "ioConfig": {
    "topicPattern": "clicks|impressions",
    // ... other config
  }
}
For topics starting with “metrics-”:
{
  "ioConfig": {
    "topicPattern": "metrics-.*",
    // ... other config
  }
}
Druid automatically starts ingesting from new topics matching the regex pattern.

Consumer Properties

Consumer properties control how a supervisor reads events from Kafka. You must include bootstrap.servers with a list of Kafka brokers:
<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...
To enable SSL connections with SASL authentication, set environment variables for the Druid user on Overlord and Peon machines:
export KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='accesskey' password='secret key';"
export SSL_KEY_PASSWORD=mysecretkeypassword
export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword
export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword
Then reference these in the supervisor spec using the dynamic config provider:
"consumerProperties": {
  "bootstrap.servers": "localhost:9092",
  "security.protocol": "SASL_SSL",
  "sasl.mechanism": "PLAIN",
  "ssl.keystore.location": "/opt/kafka/config/kafka01.keystore.jks",
  "ssl.truststore.location": "/opt/kafka/config/kafka.truststore.jks",
  "druid.dynamic.config.provider": {
    "type": "environment",
    "variables": {
      "sasl.jaas.config": "KAFKA_JAAS_CONFIG",
      "ssl.key.password": "SSL_KEY_PASSWORD",
      "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
      "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD"
    }
  }
}
The isolation.level property determines how Druid reads transactional messages:
  • read_uncommitted (default): Reads all messages, including aborted transactions
  • read_committed: Reads only committed transactional messages
"consumerProperties": {
  "bootstrap.servers": "localhost:9092",
  "isolation.level": "read_committed"
}

Idle Configuration

Idle state transitioning is experimental.
When the supervisor enters idle state, no new tasks are launched after current tasks complete. This can reduce costs for topics with sporadic data.
enabled
boolean
default:false
If true, the supervisor becomes idle when there’s no data on the input topic for some time.
inactiveAfterMillis
long
default:600000
The supervisor becomes idle if all data has been read and no new data is published for this many milliseconds.
{
  "type": "kafka",
  "spec": {
    "dataSchema": { /* ... */ },
    "ioConfig": {
      "topic": "metrics",
      "inputFormat": { "type": "json" },
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H",
      "idleConfig": {
        "enabled": true,
        "inactiveAfterMillis": 600000
      }
    },
    "tuningConfig": { /* ... */ }
  }
}

Kafka Input Format

The kafka input format lets you parse Kafka metadata fields in addition to the payload value contents.

Supported Metadata Fields

  • Kafka timestamp: Event timestamp from Kafka
  • Kafka topic: Topic name
  • Kafka headers: Event headers
  • Kafka key: Message key

Configuration

valueFormat
object
required
Define how to parse the payload value (e.g., { "type": "json" }).
timestampColumnName
string
default:"kafka.timestamp"
Custom name for the Kafka timestamp in the Druid schema.
topicColumnName
string
default:"kafka.topic"
Custom name for the Kafka topic in the Druid schema. Useful when ingesting from multiple topics.
headerFormat
object
default:"string"
Encoding format for Kafka headers. Supported values: string, ISO-8859-1, US-ASCII, UTF-16, UTF-16BE, UTF-16LE.
headerColumnPrefix
string
default:"kafka.header."
Prefix for Kafka header columns to avoid conflicts with payload columns.
keyFormat
object
Input format to parse the key. Only the first value is used.
keyColumnName
string
default:"kafka.key"
Name for the Kafka key column.
{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "topic": "wiki-edits",
      "inputFormat": {
        "type": "kafka",
        "valueFormat": {
          "type": "json"
        },
        "headerFormat": {
          "type": "string"
        },
        "keyFormat": {
          "type": "tsv",
          "findColumnsFromHeader": false,
          "columns": ["x"]
        }
      },
      "useEarliestOffset": true
    },
    "dataSchema": {
      "dataSource": "wikiticker",
      "timestampSpec": {
        "column": "timestamp",
        "format": "posix"
      },
      "dimensionsSpec": {
        "useSchemaDiscovery": true,
        "includeAllDimensions": true
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    },
    "tuningConfig": {
      "type": "kafka"
    }
  }
}
Example Kafka message:
// Kafka timestamp: 1680795276351
// Kafka topic: wiki-edits
// Kafka headers: env=development, zone=z1
// Kafka key: wiki-edit
// Kafka payload value:
{
  "channel": "#sv.wikipedia",
  "timestamp": "2016-06-27T00:00:11.080Z",
  "page": "Salo Toraut",
  "delta": 31,
  "namespace": "Main"
}
Parsed result:
{
  "channel": "#sv.wikipedia",
  "timestamp": "2016-06-27T00:00:11.080Z",
  "page": "Salo Toraut",
  "delta": 31,
  "namespace": "Main",
  "kafka.timestamp": 1680795276351,
  "kafka.topic": "wiki-edits",
  "kafka.header.env": "development",
  "kafka.header.zone": "z1",
  "kafka.key": "wiki-edit"
}

Tuning Configuration

Kafka-specific tuningConfig properties:
numPersistThreads
integer
default:1
Number of threads to use to create and persist incremental segments on disk. Increase for datasources with hundreds or thousands of columns.
For shared tuning properties, see Supervisor tuning configuration.

Deployment Notes

Druid assigns Kafka partitions to each Kafka indexing task. A task writes events from Kafka into a single segment until reaching:
  • maxRowsPerSegment
  • maxTotalRows
  • intermediateHandoffPeriod
At this point, the task creates a new partition for subsequent events.

Incremental Hand-offs

Tasks perform incremental hand-offs, so segments become available as they’re ready. When limits are reached:
  1. Task hands off all segments
  2. Creates a new set of segments
  3. Continues ingestion
This allows tasks to run for longer durations without accumulating old segments locally.

Small Segments

Small segments may still be produced. For example:
  • Task duration: 4 hours
  • Segment granularity: HOUR
  • Supervisor started: 9:10
At 13:10, new tasks start. Events for 13:00-14:00 may split across old and new tasks, creating small segments. To merge small segments, schedule re-indexing tasks with a different segment granularity. See Segment size optimization.

Supervisor API

Manage and monitor supervisors via API

Supervisor Reference

Supervisor status and capacity planning

Kafka Tutorial

Load data from Apache Kafka

Data Formats

Supported input formats

Build docs developers (and LLMs) love