Skip to main content

Overview

Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. The Kinesis integration for Spark Streaming creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, and checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.
This integration uses the legacy DStreams API. For new applications, consider using Structured Streaming with Kafka or another streaming source if possible. If you need to use Kinesis, this is currently the only option available.

Prerequisites

Before you begin, you need:
  • An AWS account with Kinesis access
  • A Kinesis stream set up in one of the valid Kinesis endpoints
  • AWS credentials configured (access key and secret key)
You can create a Kinesis stream following the AWS Kinesis documentation.

Linking

For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
    <version>3.5.0</version>
</dependency>
By linking to this library, you will include ASL-licensed code in your application.

Creating a Kinesis Input Stream

You can create a Kinesis input DStream in your streaming application:
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

kinesisStream = KinesisUtils.createStream(
    streamingContext,
    [Kinesis app name],
    [Kinesis stream name],
    [endpoint URL],
    [region name],
    [initial position],
    [checkpoint interval],
    [metricsLevel.DETAILED],
    StorageLevel.MEMORY_AND_DISK_2
)
See the API documentation for more details.

Configuration Parameters

streamingContext
StreamingContext
required
The Spark StreamingContext containing an application name used by Kinesis to tie this application to the Kinesis stream.
Kinesis app name
string
required
The application name that will be used to checkpoint the Kinesis sequence numbers in DynamoDB table.
  • The application name must be unique for a given account and region
  • If the table exists but has incorrect checkpoint information (for a different stream, or old expired sequence numbers), there may be temporary errors
Kinesis stream name
string
required
The name of the Kinesis stream that this streaming application will pull data from.
endpoint URL
string
required
Valid Kinesis endpoint URL. You can find valid endpoints in the AWS documentation.
region name
string
required
Valid Kinesis region name. See AWS regions documentation.
initial position
KinesisInitialPosition
required
The initial position to start reading from the stream:
  • KinesisInitialPositions.TrimHorizon: Start from the oldest available data
  • KinesisInitialPositions.Latest: Start from the latest data
  • KinesisInitialPositions.AtTimestamp(Date): Start from a specific timestamp (Scala/Java only)
checkpoint interval
Duration
required
The interval at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
metricsLevel
MetricsLevel
default:"DETAILED"
CloudWatch metrics level and dimensions. See AWS documentation about monitoring KCL for details.
storageLevel
StorageLevel
default:"MEMORY_AND_DISK_2"
Storage level for the Kinesis data in Spark.

Advanced Configuration

You can provide a message handler function to process Kinesis records with additional metadata:
import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil}

val kinesisStream = KinesisInputDStream.builder
    .streamingContext(streamingContext)
    .endpointUrl([endpoint URL])
    .regionName([region name])
    .streamName([streamName])
    .initialPosition([initial position])
    .checkpointAppName([Kinesis app name])
    .checkpointInterval([checkpoint interval])
    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
    .metricsLevel(MetricsLevel.DETAILED)
    .metricsEnabledDimensions(
      Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME))
    .buildWithMessageHandler([message handler])
The message handler is a function that takes a Kinesis KinesisClientRecord and returns a generic object.

Running the Example

To run the Kinesis example:
  1. Set up environment variables with your AWS credentials:
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
  1. Run the example:
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \
    connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
    [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
  1. Generate test data using the Kinesis data producer:
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream.

Runtime Characteristics

Kinesis data processing is ordered per partition and occurs at-least once per message.
Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB.
  • A single Kinesis stream shard is processed by one input DStream at a time
  • A single Kinesis input DStream can read from multiple shards by creating multiple KinesisRecordProcessor threads
  • You never need more Kinesis input DStreams than the number of Kinesis stream shards
  • Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances)
  • The Kinesis input DStream will balance the load between all DStreams, even across processes/instances
  • The load will be balanced during re-shard events (merging and splitting) due to changes in load

Kinesis Checkpointing

Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table:
  • This allows the system to recover from failures and continue processing where the DStream left off
  • Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling
  • The provided example handles throttling with a random-backoff-retry strategy
When the input DStream starts with no checkpoint info:
  • KinesisInitialPositions.TrimHorizon: Starts from the oldest record available (may lead to duplicate processing)
  • KinesisInitialPositions.Latest: Starts from the latest tip (could lead to missed records if data is added while no input DStreams are running)
  • KinesisInitialPositions.AtTimestamp(Date): Starts from the specified UTC timestamp (Scala/Java only)

Kinesis Retry Configuration

You can configure retry behavior using Spark configuration properties:
spark.streaming.kinesis.retry.waitTime
duration
default:"100ms"
Wait time between Kinesis retries. When reading from Amazon Kinesis, users may hit ProvisionedThroughputExceededException’s when consuming faster than 5 transactions/second or exceeding the maximum read rate of 2 MiB/second.
spark.streaming.kinesis.retry.maxAttempts
int
default:"3"
Maximum number of retries for Kinesis fetches. This can be increased to have more retries for Kinesis reads when encountering throughput exceptions.

Record De-aggregation

When data is generated using the Kinesis Producer Library (KPL), messages may be aggregated for cost savings. Spark Streaming will automatically de-aggregate records during consumption.

Best Practices

As a best practice, it’s recommended that you avoid re-shard jitter by over-provisioning when possible.
Monitor the DynamoDB table used for checkpointing to ensure it has sufficient read/write capacity.
Set your batch interval based on your latency requirements and the rate of incoming data. The checkpoint interval should typically match the batch interval.
Be prepared to handle ProvisionedThroughputExceededException by configuring appropriate retry settings.

Deploying

Package your application JAR with the necessary dependencies and deploy using spark-submit:
./bin/spark-submit \
  --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.0 \
  --class com.example.KinesisApp \
  your-application.jar
For Python applications, add the packages using the --packages option:
./bin/spark-submit \
  --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.0 \
  kinesis_app.py

Kafka Integration

Integrate with Apache Kafka for stream processing

DStreams Programming Guide

Learn more about Spark Streaming with DStreams

AWS Kinesis Docs

Official Amazon Kinesis documentation

Build docs developers (and LLMs) love