Skip to main content

Overview

Apache Spark provides two approaches for integrating with Apache Kafka:
  1. Structured Streaming + Kafka (Recommended): Modern, DataFrame-based API for Kafka integration
  2. Spark Streaming + Kafka (Legacy): DStreams-based API for Kafka integration
This guide covers both approaches, with emphasis on the recommended Structured Streaming approach.
Structured Streaming is compatible with Kafka broker versions 0.10.0 or higher. For the Kafka headers functionality, your Kafka client version should be 0.11.0.0 or higher.

Structured Streaming + Kafka

Linking

For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.5.0</version>
</dependency>

Reading from Kafka

You can create a streaming DataFrame that reads from one or more Kafka topics:
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()

# Subscribe with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

Kafka Source Schema

Each row in the Kafka source has the following schema:
ColumnTypeDescription
keybinaryMessage key
valuebinaryMessage value
topicstringTopic name
partitionintPartition number
offsetlongOffset in partition
timestamptimestampMessage timestamp
timestampTypeintTimestamp type
headersarrayMessage headers (optional)

Required Options

kafka.bootstrap.servers
string
required
A comma-separated list of host:port. The Kafka bootstrap servers configuration.
subscribe
string
A comma-separated list of topics to subscribe to. Only one of “assign”, “subscribe”, or “subscribePattern” can be specified.
subscribePattern
string
Java regex string pattern to subscribe to topics. Only one of “assign”, “subscribe”, or “subscribePattern” can be specified.
assign
json
Specific TopicPartitions to consume. Format: {"topicA":[0,1],"topicB":[2,4]}. Only one of “assign”, “subscribe”, or “subscribePattern” can be specified.

Optional Configuration

startingOffsets
string
default:"latest for streaming, earliest for batch"
The start point when a query is started, either “earliest”, “latest”, or a JSON string specifying a starting offset for each TopicPartition.Example: {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
endingOffsets
string
default:"latest"
The end point when a batch query is ended. Either “latest” or a JSON string specifying an ending offset for each TopicPartition.
startingTimestamp
string
The start point of timestamp when a query is started. A string specifying a starting timestamp for all partitions in topics being subscribed.
maxOffsetsPerTrigger
long
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
minOffsetsPerTrigger
long
Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
failOnDataLoss
boolean
default:"true"
Whether to fail the query when it’s possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn’t work as expected.
kafkaConsumer.pollTimeoutMs
long
default:"120000"
The timeout in milliseconds to poll data from Kafka in executors.
fetchOffset.numRetries
int
default:"3"
Number of times to retry before giving up fetching Kafka offsets.
fetchOffset.retryIntervalMs
long
default:"10"
Milliseconds to wait before retrying to fetch Kafka offsets.

Writing to Kafka

You can write the output of a streaming query to Kafka:
# Write key-value to Kafka
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "output-topic") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .start()

# Write to multiple topics dynamically
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .start()
When writing to Kafka, the DataFrame should have the following columns:
  • key (optional): binary or string
  • value (required): binary or string
  • topic (optional for single topic, required for multiple topics): string

Batch Queries with Kafka

You can also use Kafka as a source for batch queries:
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{{"topic1":{{"0":23,"1":-2}},"topic2":{{"0":-2}}}}""") \
  .option("endingOffsets", """{{"topic1":{{"0":50,"1":-1}},"topic2":{{"0":-1}}}}""") \
  .load()

Spark Streaming + Kafka (Legacy)

The DStreams-based Kafka integration is legacy code. For new applications, use the Structured Streaming + Kafka approach described above.

Linking (DStreams)

For the DStreams-based Kafka integration:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.5.0</version>
</dependency>
Do not manually add dependencies on org.apache.kafka artifacts. The spark-streaming-kafka-0-10 artifact has the appropriate transitive dependencies already.

Creating a Direct Stream

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

Location Strategies

The new Kafka consumer API pre-fetches messages into buffers. Therefore it’s important for performance to keep cached consumers on executors:
  • PreferConsistent: Distributes partitions evenly across available executors (most common)
  • PreferBrokers: Prefer to schedule partitions on the Kafka leader for that partition (use when executors are on same hosts as Kafka brokers)
  • PreferFixed: Allows you to specify an explicit mapping of partitions to hosts

Obtaining Offsets

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
The typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream. The one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition.

Deploying

As with any Spark applications, use spark-submit to launch your application:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 ...

Best Practices

For new applications, always use the Structured Streaming + Kafka integration. It provides better performance, easier API, and more features.
If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, you’ll need to change group.max.session.timeout.ms on the broker.
Always enable checkpointing for production applications to ensure fault tolerance and exactly-once semantics.
Monitor your consumer lag to ensure your Spark application is keeping up with the incoming Kafka data.

Structured Streaming Guide

Learn more about Structured Streaming

Kinesis Integration

Integrate with Amazon Kinesis

Build docs developers (and LLMs) love