Skip to main content
When you enable the Kinesis indexing service, you can configure supervisors on the Overlord to manage the creation and lifetime of Kinesis indexing tasks. Kinesis indexing tasks read events using the Kinesis shard and sequence number mechanism to guarantee exactly-once ingestion.

Setup

To use the Kinesis indexing service, load the druid-kinesis-indexing-service core extension on both the Overlord and Middle Manager. See Loading extensions for more information.
Review Known issues before deploying to production.

Supervisor Spec Configuration

This section covers Kinesis-specific configuration properties. For properties shared across all streaming methods, see Supervisor spec.
{
  "type": "kinesis",
  "spec": {
    "ioConfig": {
      "type": "kinesis",
      "stream": "KinesisStream",
      "inputFormat": {
        "type": "json"
      },
      "useEarliestSequenceNumber": true
    },
    "tuningConfig": {
      "type": "kinesis"
    },
    "dataSchema": {
      "dataSource": "KinesisStream",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": [
          "isRobot",
          "channel",
          "flags",
          "isUnpatrolled",
          "page",
          "diffUrl",
          { "type": "long", "name": "added" },
          "comment",
          { "type": "long", "name": "commentLength" },
          "isNew",
          "isMinor",
          { "type": "long", "name": "delta" },
          "isAnonymous",
          "user",
          { "type": "long", "name": "deltaBucket" },
          { "type": "long", "name": "deleted" },
          "namespace",
          "cityName",
          "countryName",
          "regionIsoCode",
          "metroCode",
          "countryIsoCode",
          "regionName"
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "hour"
      }
    }
  }
}

I/O Configuration

Kinesis-specific ioConfig properties:
stream
string
required
The Kinesis stream to read.
endpoint
string
default:"kinesis.us-east-1.amazonaws.com"
The AWS Kinesis stream endpoint for a region. See AWS service endpoints.
useEarliestSequenceNumber
boolean
default:false
If a supervisor manages a datasource for the first time, this determines whether to retrieve the earliest or latest sequence numbers in Kinesis.
fetchDelayMillis
integer
default:0
Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See Determine fetch settings.
awsAssumedRoleArn
string
The AWS assumed role to use for additional permissions.
awsExternalId
string
The AWS external ID to use for additional permissions.

Data Format

The Kinesis indexing service supports both inputFormat and parser to specify the data format. Use inputFormat unless you need a format only supported by the legacy parser. Supported inputFormat values:
  • kinesis
  • csv
  • tsv
  • json
  • avro_stream
  • protobuf
You can use parser to read thrift formats.

Tuning Configuration

Kinesis-specific tuningConfig properties:
skipSequenceNumberAvailabilityCheck
boolean
default:false
Whether to check if the current sequence number is still available in a Kinesis shard. If false, the task attempts to reset based on resetOffsetAutomatically.
recordBufferSizeBytes
integer
The size of the buffer (heap memory bytes) between Kinesis fetch threads and the main ingestion thread. See Determine fetch settings for defaults.
recordBufferOfferTimeout
integer
default:5000
Milliseconds to wait for space in the buffer before timing out.
recordBufferFullWait
integer
default:5000
Milliseconds to wait for the buffer to drain before attempting to fetch records again.
fetchThreads
integer
default:"procs * 2"
Size of the pool of threads fetching data from Kinesis. No benefit in having more threads than Kinesis shards.
maxBytesPerPoll
integer
default:1000000
Maximum bytes to fetch from buffer per poll. At least one record is polled regardless of this config.
repartitionTransitionDuration
ISO 8601 period
default:"PT2M"
When shards split or merge, wait time for the stream to write records to new shards. Helps avoid issues with empty shard handling.
useListShards
boolean
default:false
If true, use listShards API to prevent LimitExceededException. Requires appropriate IAM permissions.

AWS Authentication

Druid uses AWS access and secret keys to authenticate Kinesis API requests. Druid looks for credentials in this order:
  1. Environment variables
  2. Web Identity Token
  3. Default profile configuration file
  4. EC2 instance profile provider

Option 2: Long-term Security Credentials

AWS does not recommend providing long-term credentials in configuration files as it poses a security risk.
You can provide credentials in common.runtime.properties:
druid.kinesis.accessKey=AKIAWxxxxxxxxxx4NCKS
druid.kinesis.secretKey=Jbytxxxxxxxxxxx2+555

Required IAM Permissions

Permissions depend on the useListShards flag.
Required permissions:
  • ListStreams - List your data streams
  • Get* - Required for GetShardIterator
  • GetRecords - Get data records from shards
  • ListShards - Get shards for a stream
Example Policy
[
  {
    "Effect": "Allow",
    "Action": ["kinesis:List*"],
    "Resource": ["*"]
  },
  {
    "Effect": "Allow",
    "Action": ["kinesis:Get*"],
    "Resource": ["<ARN for shards to be ingested>"]
  }
]

Shards and Segment Handoff

Each Kinesis indexing task writes events from Kinesis shards into a single segment until reaching:
  • maxRowsPerSegment
  • maxTotalRows
  • intermediateHandoffPeriod
At this point, the task creates a new shard for the segment granularity.

Incremental Hand-offs

Tasks perform incremental hand-offs so segments are available as they’re created. When limits are reached:
  1. Task hands off all segments
  2. Creates a new set of segments
  3. Continues ingestion
This allows tasks to run for longer durations without accumulating old segments locally.

Small Segments

Small segments may still be produced. For example:
  • Task duration: 4 hours
  • Segment granularity: HOUR
  • Supervisor started: 9:10
At 13:10, new tasks start. Events for 13:00-14:00 may split across tasks, creating small segments. To merge small segments, schedule re-indexing tasks with a different segment granularity. See Segment size optimization.

Determine Fetch Settings

Kinesis indexing tasks fetch records using fetchThreads threads. If fetchThreads exceeds the number of Kinesis shards, excess threads remain unused. Each fetch thread:
  • Fetches up to 10 MB of records at once
  • Has a delay of fetchDelayMillis between fetches
  • Pushes records into a shared queue of size recordBufferSizeBytes

Default Values

  • fetchThreads: 2 * processors available to task
    • Limited to prevent total fetched data from exceeding 5% of max heap
    • Processors available = total processors / druid.worker.capacity
  • fetchDelayMillis: 0 (no delay)
  • recordBufferSizeBytes: Smaller of 100 MB or 10% of available heap
  • maxBytesPerPoll: 1000000

Kinesis API Limits

  • Each data record: Up to 1 MB
  • Each shard: Up to 5 transactions per second for reads
  • Each shard: Up to 2 MB per second
  • GetRecords max return: 10 MB
If limits are exceeded, Kinesis throws ProvisionedThroughputExceededException. Druid tasks pause for the larger of fetchDelayMillis or 3 seconds, then retry.
Default settings are sufficient for most use cases. Adjust only if you need finer control over fetch rate and memory usage based on:
  • Average record size
  • Number of consumers reading from the shard

Deaggregation

The Kinesis indexing service supports de-aggregation of multiple rows stored within a single Kinesis Data Streams record for more efficient data transfer.

Resharding

Resharding lets you adjust the number of shards in a stream to adapt to changes in data flow rate. During resharding:
  1. Early shutdown of ingestion tasks occurs
  2. Possible task failures may happen
  3. The supervisor updates shard-to-task-group mappings
  4. Tasks with closed shards are shut down
  5. Distribution of active shards is balanced across tasks
The resharding window concludes when:
  • All closed shards are fully read and published
  • Tasks with inactive shard assignments are shut down
When the supervisor detects new partitions while running, tasks read from the earliest sequence number regardless of useEarliestSequence setting.If resharding occurs while the supervisor is suspended and useEarliestSequence is false, resuming causes tasks to read new shards from the latest sequence.

Known Issues

Before deploying to production:
Read Throughput LimitsKinesis imposes a read throughput limit per shard. With multiple supervisors reading from the same stream, add more shards to ensure sufficient read throughput.
High IteratorAgeMillisecondsA Kinesis supervisor may compare checkpoint sequence numbers to the retention window. These checks fetch the earliest sequence number, which can cause IteratorAgeMilliseconds to become very high in AWS CloudWatch.

Supervisor API

Manage and monitor supervisors via API

Supervisor Reference

Supervisor status and capacity planning

Data Formats

Supported input formats

AWS IAM Guide

AWS Identity and Access Management

Build docs developers (and LLMs) love