Skip to main content
This guide covers JSON-based batch ingestion using ingestion specs. For SQL-based batch ingestion using the druid-multi-stage-query extension, see SQL-based ingestion.
For a comparison of batch ingestion methods and to determine which is right for you, see the ingestion methods table.

Overview

Apache Druid supports the following types of JSON-based batch indexing tasks:
  • Parallel task indexing (index_parallel) - Runs multiple indexing tasks concurrently. Recommended for production ingestion tasks.
  • Simple task indexing (index) - Runs a single indexing task at a time. Suitable for development and test environments.
This page focuses on the index_parallel ingestion specs.

Submit an Indexing Task

You can submit a JSON-based batch indexing task in two ways:
  1. Use the Load Data UI in the web console to define and submit an ingestion spec
  2. POST the ingestion spec to the Tasks API endpoint: /druid/indexer/v1/task
Alternatively, use the indexing script: bin/post-index-task

Parallel Task Indexing

The index_parallel task type enables multi-threaded batch indexing using only Druid resources - no external systems like Hadoop are required.

How It Works

The index_parallel task operates as a supervisor that orchestrates the entire indexing process:
  1. The supervisor splits input data into portions
  2. Worker tasks are created to process individual data portions
  3. The Overlord schedules and runs workers on Middle Managers or Indexers
  4. Workers report resulting segment lists back to the supervisor
  5. The supervisor monitors worker status and retries failed tasks
  6. When all workers succeed, segments are published atomically
Behavior varies depending on the partitionsSpec configuration. See PartitionsSpec for details.

Requirements

Parallel tasks require:
  • A splittable inputSource in the ioConfig
  • maxNumConcurrentSubTasks > 1 in the tuningConfig
If maxNumConcurrentSubTasks is 1 or less, tasks run sequentially.

Supported Compression Formats

JSON-based batch ingestion supports:
  • bz2
  • gz
  • xz
  • zip
  • sz (Snappy)
  • zst (ZSTD)

Configuration Example

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "wikipedia_parallel_index_test",
      "timestampSpec": {
        "column": "timestamp"
      },
      "dimensionsSpec": {
        "dimensions": [
          "country",
          "page",
          "language",
          "user",
          "unpatrolled",
          "newPage",
          "robot",
          "anonymous",
          "namespace",
          "continent",
          "region",
          "city"
        ]
      },
      "metricsSpec": [
        {
          "type": "count",
          "name": "count"
        },
        {
          "type": "doubleSum",
          "name": "added",
          "fieldName": "added"
        },
        {
          "type": "doubleSum",
          "name": "deleted",
          "fieldName": "deleted"
        },
        {
          "type": "doubleSum",
          "name": "delta",
          "fieldName": "delta"
        }
      ],
      "granularitySpec": {
        "segmentGranularity": "DAY",
        "queryGranularity": "second",
        "intervals": [
          "2013-08-31/2013-09-02"
        ]
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "local",
        "baseDir": "examples/indexing/",
        "filter": "wikipedia_index_data*"
      },
      "inputFormat": {
        "type": "json"
      }
    },
    "tuningConfig": {
      "type": "index_parallel",
      "partitionsSpec": {
        "type": "single_dim",
        "partitionDimension": "country",
        "targetRowsPerSegment": 5000000
      },
      "maxNumConcurrentSubTasks": 2
    }
  }
}

Configuration Reference

Top-Level Properties

type
string
required
The task type. Set to index_parallel for parallel task indexing.
id
string
The task ID. If omitted, Druid generates the task ID using the task type, data source name, interval, and date-time stamp.
spec
object
required
The ingestion spec that defines the data schema, IO config, and tuning config.
context
object
Context to specify various task configuration parameters. See Task context parameters.

dataSchema

Defines how Druid stores your data: the primary timestamp column, dimensions, metrics, and transformations.
When defining granularitySpec, explicitly define intervals if you know the time range. This causes locking failures to happen faster and prevents accidental data replacement outside the interval range.
See Ingestion Spec DataSchema for complete details.

ioConfig

type
string
required
Set to index_parallel.
inputFormat
object
required
Specifies how to parse input data. See input formats.
appendToExisting
boolean
default:false
Creates segments as additional shards of the latest version. You must use dynamic partitioning type for appended segments.
dropExisting
boolean
default:false
This feature is experimental.
If true, appendToExisting is false, and granularitySpec contains an interval, the ingestion task replaces all existing segments fully contained by the specified interval.

tuningConfig

type
string
required
Set to index_parallel.
maxRowsInMemory
integer
default:1000000
Determines when Druid should perform intermediate persists to disk. Normally you don’t need to set this.
maxBytesInMemory
long
default:"1/6 of max JVM memory"
Number of bytes to aggregate in heap memory before persisting. Maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).
partitionsSpec
object
default:"dynamic"
Defines how to partition data in each time chunk. See PartitionsSpec.
maxNumConcurrentSubTasks
integer
default:1
Maximum number of worker tasks that can run in parallel. If set to 1, the supervisor processes data ingestion on its own.
maxRetry
integer
default:3
Maximum number of retries on task failures.
taskStatusCheckPeriodMs
long
default:1000
Polling period in milliseconds to check running task statuses.

PartitionsSpec

The primary partition for Druid is time. You can define a secondary partitioning method in the partitions spec.

Partitioning Comparison

PartitionsSpecSpeedMethodRollup ModeQuery Pruning
dynamicFastestRow count-basedBest-effortN/A
hashedModerateHash-basedPerfectYes
single_dimSlowerSingle dimension rangePerfectYes
rangeSlowestMulti-dimension rangePerfectYes

Dynamic Partitioning

Best for best-effort rollup with the fastest ingestion speed.
type
string
required
Set to dynamic.
maxRowsPerSegment
integer
default:5000000
Determines how many rows are in each segment.
maxTotalRows
long
default:20000000
Total number of rows across all segments waiting to be pushed.

Hash-Based Partitioning

For perfect rollup with hash-based partitioning on specified dimensions.
type
string
required
Set to hashed.
numShards
integer
Directly specify the number of shards to create. Cannot be used with targetRowsPerSegment.
targetRowsPerSegment
integer
default:5000000
Target row count for each partition. Druid determines partition count automatically.
partitionDimensions
array
The dimensions to partition on. Leave blank to select all dimensions.
partitionFunction
string
default:"murmur3_32_abs"
Hash function to compute hash of partition dimensions. Currently only murmur3_32_abs is supported.

Single-Dimension Range Partitioning

For perfect rollup with range partitioning on a single dimension.
Single dimension range partitioning is not supported in sequential mode.
type
string
required
Set to single_dim.
partitionDimension
string
required
The dimension to partition on. Only rows with a single dimension value are allowed.
targetRowsPerSegment
integer
Target number of rows to include in a partition. Targets segments of 500MB-1GB. Either this or maxRowsPerSegment is required.
assumeGrouped
boolean
default:false
Assume input data is already grouped on time and dimensions. Ingestion runs faster but may choose sub-optimal partitions if violated.

Multi-Dimension Range Partitioning

Improves over single-dimension range partitioning by distributing segment sizes more evenly.
type
string
required
Set to range.
partitionDimensions
array
required
Array of dimensions to partition on. Order from most to least frequently queried. Limit to 3-5 dimensions for best results.
targetRowsPerSegment
integer
Target number of rows per partition, targeting 500MB-1GB segments.

Splittable Input Sources

Parallel tasks require splittable input sources:

Capacity Planning

The supervisor task can create up to maxNumConcurrentSubTasks worker tasks regardless of available task slots. Total concurrent tasks: maxNumConcurrentSubTasks + 1 (including supervisor)
If you run parallel index tasks with stream ingestion, limit the max capacity for batch ingestion to prevent blocking stream ingestion.
To limit batch ingestion to b tasks with t parallel index tasks:
(sum of maxNumConcurrentSubTasks + t) < b

HTTP Status Endpoints

The supervisor task provides HTTP endpoints to monitor running status:

Mode Endpoint

GET http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode
Returns parallel if running in parallel mode, otherwise sequential.

Phase Endpoint

GET http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase
Returns the name of the current phase in parallel mode.

Progress Endpoint

GET http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress
Returns estimated progress of the current phase:
{
  "running": 10,
  "succeeded": 0,
  "failed": 0,
  "complete": 0,
  "total": 10,
  "estimatedExpectedSucceeded": 10
}

Batch Ingestion Tutorial

Step-by-step guide to loading a file

Input Sources

Available input sources for batch ingestion

Data Formats

Supported input formats

Tasks API

API reference for task management

Build docs developers (and LLMs) love