Overview
Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. The Kinesis integration for Spark Streaming creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, and checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.Prerequisites
Before you begin, you need:- An AWS account with Kinesis access
- A Kinesis stream set up in one of the valid Kinesis endpoints
- AWS credentials configured (access key and secret key)
You can create a Kinesis stream following the AWS Kinesis documentation.
Linking
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact:By linking to this library, you will include ASL-licensed code in your application.
Creating a Kinesis Input Stream
You can create a Kinesis input DStream in your streaming application:- Python
- Scala
- Java
Configuration Parameters
The Spark StreamingContext containing an application name used by Kinesis to tie this application to the Kinesis stream.
The application name that will be used to checkpoint the Kinesis sequence numbers in DynamoDB table.
The name of the Kinesis stream that this streaming application will pull data from.
Valid Kinesis endpoint URL. You can find valid endpoints in the AWS documentation.
Valid Kinesis region name. See AWS regions documentation.
The initial position to start reading from the stream:
KinesisInitialPositions.TrimHorizon: Start from the oldest available dataKinesisInitialPositions.Latest: Start from the latest dataKinesisInitialPositions.AtTimestamp(Date): Start from a specific timestamp (Scala/Java only)
The interval at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
CloudWatch metrics level and dimensions. See AWS documentation about monitoring KCL for details.
Storage level for the Kinesis data in Spark.
Advanced Configuration
You can provide a message handler function to process Kinesis records with additional metadata:- Scala
- Java
KinesisClientRecord and returns a generic object.
Running the Example
To run the Kinesis example:- Set up environment variables with your AWS credentials:
- Run the example:
- Python
- Scala
- Java
- Generate test data using the Kinesis data producer:
Runtime Characteristics
Data Processing Order
Data Processing Order
Kinesis data processing is ordered per partition and occurs at-least once per message.
Multiple Applications
Multiple Applications
Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB.
Shard Processing
Shard Processing
Scaling
Scaling
- Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances)
- The Kinesis input DStream will balance the load between all DStreams, even across processes/instances
- The load will be balanced during re-shard events (merging and splitting) due to changes in load
Kinesis Checkpointing
Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table:- This allows the system to recover from failures and continue processing where the DStream left off
- Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling
- The provided example handles throttling with a random-backoff-retry strategy
KinesisInitialPositions.TrimHorizon: Starts from the oldest record available (may lead to duplicate processing)KinesisInitialPositions.Latest: Starts from the latest tip (could lead to missed records if data is added while no input DStreams are running)KinesisInitialPositions.AtTimestamp(Date): Starts from the specified UTC timestamp (Scala/Java only)
Kinesis Retry Configuration
You can configure retry behavior using Spark configuration properties:Wait time between Kinesis retries. When reading from Amazon Kinesis, users may hit
ProvisionedThroughputExceededException’s when consuming faster than 5 transactions/second or exceeding the maximum read rate of 2 MiB/second.Maximum number of retries for Kinesis fetches. This can be increased to have more retries for Kinesis reads when encountering throughput exceptions.
Record De-aggregation
When data is generated using the Kinesis Producer Library (KPL), messages may be aggregated for cost savings. Spark Streaming will automatically de-aggregate records during consumption.Best Practices
Avoid Re-shard Jitter
Avoid Re-shard Jitter
As a best practice, it’s recommended that you avoid re-shard jitter by over-provisioning when possible.
Monitor DynamoDB Table
Monitor DynamoDB Table
Monitor the DynamoDB table used for checkpointing to ensure it has sufficient read/write capacity.
Configure Batch Interval
Configure Batch Interval
Set your batch interval based on your latency requirements and the rate of incoming data. The checkpoint interval should typically match the batch interval.
Handle Throttling
Handle Throttling
Be prepared to handle
ProvisionedThroughputExceededException by configuring appropriate retry settings.Deploying
Package your application JAR with the necessary dependencies and deploy usingspark-submit:
--packages option:
Related Resources
Kafka Integration
Integrate with Apache Kafka for stream processing
DStreams Programming Guide
Learn more about Spark Streaming with DStreams
AWS Kinesis Docs
Official Amazon Kinesis documentation
