Setup
To use the Kinesis indexing service, load thedruid-kinesis-indexing-service core extension on both the Overlord and Middle Manager.
See Loading extensions for more information.
Supervisor Spec Configuration
This section covers Kinesis-specific configuration properties. For properties shared across all streaming methods, see Supervisor spec.I/O Configuration
Kinesis-specificioConfig properties:
The Kinesis stream to read.
The AWS Kinesis stream endpoint for a region. See AWS service endpoints.
If a supervisor manages a datasource for the first time, this determines whether to retrieve the earliest or latest sequence numbers in Kinesis.
Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See Determine fetch settings.
The AWS assumed role to use for additional permissions.
The AWS external ID to use for additional permissions.
Data Format
The Kinesis indexing service supports bothinputFormat and parser to specify the data format. Use inputFormat unless you need a format only supported by the legacy parser.
Supported inputFormat values:
kinesiscsvtsvjsonavro_streamprotobuf
parser to read thrift formats.
Tuning Configuration
Kinesis-specifictuningConfig properties:
Whether to check if the current sequence number is still available in a Kinesis shard. If
false, the task attempts to reset based on resetOffsetAutomatically.The size of the buffer (heap memory bytes) between Kinesis fetch threads and the main ingestion thread. See Determine fetch settings for defaults.
Milliseconds to wait for space in the buffer before timing out.
Milliseconds to wait for the buffer to drain before attempting to fetch records again.
Size of the pool of threads fetching data from Kinesis. No benefit in having more threads than Kinesis shards.
Maximum bytes to fetch from buffer per poll. At least one record is polled regardless of this config.
When shards split or merge, wait time for the stream to write records to new shards. Helps avoid issues with empty shard handling.
If
true, use listShards API to prevent LimitExceededException. Requires appropriate IAM permissions.AWS Authentication
Druid uses AWS access and secret keys to authenticate Kinesis API requests.Option 1: Roles or Short-term Credentials (Recommended)
Druid looks for credentials in this order:- Environment variables
- Web Identity Token
- Default profile configuration file
- EC2 instance profile provider
Option 2: Long-term Security Credentials
You can provide credentials incommon.runtime.properties:
Required IAM Permissions
Permissions depend on theuseListShards flag.
- useListShards = true
- useListShards = false
Required permissions:
ListStreams- List your data streamsGet*- Required forGetShardIteratorGetRecords- Get data records from shardsListShards- Get shards for a stream
Example Policy
Shards and Segment Handoff
Each Kinesis indexing task writes events from Kinesis shards into a single segment until reaching:maxRowsPerSegmentmaxTotalRowsintermediateHandoffPeriod
Incremental Hand-offs
Tasks perform incremental hand-offs so segments are available as they’re created. When limits are reached:- Task hands off all segments
- Creates a new set of segments
- Continues ingestion
Small Segments
Small segments may still be produced. For example:- Task duration: 4 hours
- Segment granularity: HOUR
- Supervisor started: 9:10
Determine Fetch Settings
Kinesis indexing tasks fetch records usingfetchThreads threads. If fetchThreads exceeds the number of Kinesis shards, excess threads remain unused.
Each fetch thread:
- Fetches up to 10 MB of records at once
- Has a delay of
fetchDelayMillisbetween fetches - Pushes records into a shared queue of size
recordBufferSizeBytes
Default Values
- fetchThreads:
2 * processors available to task- Limited to prevent total fetched data from exceeding 5% of max heap
- Processors available = total processors /
druid.worker.capacity
- fetchDelayMillis:
0(no delay) - recordBufferSizeBytes: Smaller of 100 MB or 10% of available heap
- maxBytesPerPoll:
1000000
Kinesis API Limits
- Each data record: Up to 1 MB
- Each shard: Up to 5 transactions per second for reads
- Each shard: Up to 2 MB per second
- GetRecords max return: 10 MB
ProvisionedThroughputExceededException. Druid tasks pause for the larger of fetchDelayMillis or 3 seconds, then retry.
Deaggregation
The Kinesis indexing service supports de-aggregation of multiple rows stored within a single Kinesis Data Streams record for more efficient data transfer.Resharding
Resharding lets you adjust the number of shards in a stream to adapt to changes in data flow rate. During resharding:- Early shutdown of ingestion tasks occurs
- Possible task failures may happen
- The supervisor updates shard-to-task-group mappings
- Tasks with closed shards are shut down
- Distribution of active shards is balanced across tasks
- All closed shards are fully read and published
- Tasks with inactive shard assignments are shut down
When the supervisor detects new partitions while running, tasks read from the earliest sequence number regardless of
useEarliestSequence setting.If resharding occurs while the supervisor is suspended and useEarliestSequence is false, resuming causes tasks to read new shards from the latest sequence.Known Issues
Before deploying to production:Related Resources
Supervisor API
Manage and monitor supervisors via API
Supervisor Reference
Supervisor status and capacity planning
Data Formats
Supported input formats
AWS IAM Guide
AWS Identity and Access Management