Understanding Segments
Each segment is a single file, typically comprising up to a few million rows of data. Since segments are organized into time chunks, it’s helpful to think of segments as living on a timeline.A datasource may have anywhere from just a few segments, up to hundreds of thousands and even millions of segments.
Segment Building Process
Each segment is created by a Middle Manager as mutable and uncommitted. Data is queryable as soon as it is added to an uncommitted segment. The segment building process accelerates later queries by producing a data file that is compact and indexed:Conversion to columnar format
Data is restructured into a column-oriented format for efficient scanning
Indexing and Handoff
Indexing is the mechanism by which new segments are created, and handoff is the mechanism by which they are published and served by Historical services.On the Indexing Side
Task starts and determines segment identifier
An indexing task starts running and building a new segment. It must determine the identifier before it starts building:
- Appending tasks (Kafka task, index task in append mode): Call an “allocate” API on the Overlord
- Overwriting tasks (Hadoop task, index task not in append mode): Lock an interval and create a new version number
Real-time data becomes queryable
If the indexing task is a realtime task (like a Kafka task), the segment is immediately queryable at this point. It’s available, but unpublished.
Push to deep storage and publish
When the indexing task has finished reading data for the segment, it pushes it to deep storage and then publishes it by writing a record into the metadata store.
On the Coordinator / Historical Side
Coordinator polls metadata store
The Coordinator polls the metadata store periodically (by default, every 1 minute) for newly published segments.
Choose Historical for loading
When the Coordinator finds a segment that is published and used, but unavailable, it chooses a Historical service to load that segment and instructs that Historical to do so.
Segment Identifiers
Segments all have a four-part identifier with the following components:The name of the datasource
For the time chunk containing the segment; corresponds to the
segmentGranularity specified at ingestion timeGenerally an ISO8601 timestamp corresponding to when the segment set was first started
An integer, unique within a datasource+interval+version; may not necessarily be contiguous
Example Segment Identifiers
For a segment in datasourceclarity-cloud0, time chunk 2018-05-21T16:00:00.000Z/2018-05-21T17:00:00.000Z, version 2018-05-21T15:56:09.909Z, and partition number 1:
Segments with partition number 0 (the first partition in a chunk) omit the partition number from the identifier.
Segment Versioning
The version number provides a form of multi-version concurrency control (MVCC) to support batch-mode overwriting.- Append Only
- Overwriting
If all you ever do is append data, then there will be just a single version for each time chunk.
- First loading the new data (but not allowing it to be queried)
- As soon as the new data is all loaded, switching all new queries to use those new segments
- Then dropping the old segments a few minutes later
Segment Lifecycle
Each segment has a lifecycle that involves three major areas:Metadata Store
Segment metadata (a small JSON payload) is stored once a segment is done being constructed. The act of inserting a record is called publishing. Records have a boolean flag named
used that controls whether the segment is intended to be queryable.Deep Storage
Segment data files are pushed to deep storage once a segment is done being constructed. This happens immediately before publishing metadata to the metadata store.
Availability for Querying
Segments are available for querying on some Druid data server, like a realtime task, directly from deep storage, or a Historical service.
Segment State Flags
You can inspect the state of currently active segments using the Druid SQLsys.segments table. It includes the following flags:
is_published: True if segment metadata has been published to the metadata store andusedis trueis_available: True if the segment is currently available for querying, either on a realtime task or Historical serviceis_realtime: True if the segment is only available on realtime tasksis_overshadowed: True if the segment is published and is fully overshadowed by some other published segments
Availability and Consistency
Druid has an architectural separation between ingestion and querying. When understanding Druid’s availability and consistency properties, we must look at each function separately.Ingestion Side: Transactional Guarantees
Druid’s primary ingestion methods are all pull-based and offer transactional guarantees. Ingestion using these methods will publish in an all-or-nothing manner:Supervised Seekable-Stream Ingestion
Supervised Seekable-Stream Ingestion
Methods like Kafka and Kinesis. Druid commits stream offsets to its metadata store alongside segment metadata, in the same transaction. This ensures exactly-once publishing behavior.
Ingestion of data that has not yet been published can be rolled back if ingestion tasks fail. Partially-ingested data is discarded, and Druid will resume ingestion from the last committed set of stream offsets.
Hadoop-based Batch Ingestion
Hadoop-based Batch Ingestion
Each task publishes all segment metadata in a single transaction.
Native Batch Ingestion
Native Batch Ingestion
- Parallel mode: The supervisor task publishes all segment metadata in a single transaction after the subtasks are finished
- Simple mode: The single task publishes all segment metadata in a single transaction after it is complete
Idempotency Guarantee
Some ingestion methods offer an idempotency guarantee. This means that repeated executions of the same ingestion will not cause duplicate data to be ingested:- ✅ Supervised seekable-stream ingestion (Kafka, Kinesis) - idempotent
- ⚠️ Hadoop-based batch ingestion - idempotent unless input source is the same Druid datasource
- ⚠️ Native batch ingestion - idempotent unless
appendToExistingis true or input source is the same Druid datasource
Query Side: Atomic Replacement
The Druid Broker is responsible for ensuring that a consistent set of segments is involved in a given query. It selects the appropriate set of segment versions to use when the query starts based on what is currently available.Atomic replacement ensures that from a user’s perspective, queries flip instantaneously from an older version of data to a newer set of data, with no consistency or performance impact. This is used for Hadoop-based batch ingestion, native batch ingestion when
appendToExisting is false, and compaction.- When a time chunk is overwritten, a new core set of segments is created with a higher version number
- The core set must all be available before the Broker will use them instead of the older set
- There can also only be one core set per version per time chunk
- Druid will also only use a single version at a time per time chunk