Skip to main content

Overview

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, and more. The computation is executed on the same optimized Spark SQL engine. The system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. Since Spark 2.3, a new low-latency processing mode called Continuous Processing can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.

Quick Example

Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Here’s how you can express this using Structured Streaming:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
This lines DataFrame represents an unbounded table containing the streaming text data. The Spark SQL engine will take care of running it incrementally and continuously, updating the final result as streaming data continues to arrive.

Creating Streaming DataFrames

You can create streaming DataFrames through the DataStreamReader interface returned by SparkSession.readStream(). Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.

Input Sources

Structured Streaming has several built-in sources:
Reads files written in a directory as a stream of data. Supported file formats include text, CSV, JSON, ORC, and Parquet.
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")
Files must be atomically placed in the given directory, which in most file systems can be achieved by file move operations.
Reads data from Kafka. Compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
Reads UTF8 text data from a socket connection. The listening server socket is at the driver. This should be used only for testing as it does not provide end-to-end fault-tolerance guarantees.
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()
Generates data at the specified number of rows per second. Each output row contains a timestamp and value. This source is intended for testing and benchmarking.
rateDF = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load()

Operations on Streaming DataFrames

You can apply most common operations on streaming DataFrames – ranging from untyped, SQL-like operations (e.g. select, where, groupBy) to typed RDD-like operations (e.g. map, filter, flatMap).

Basic Operations

# streaming DataFrame with IOT device data
df = ...  # schema: { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()

Window Operations

Structured Streaming supports time-based window operations. You can use event-time windows to group data by time windows.
# Group the data by window and word then compute the count
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word
    ).count()

Output Sinks

Structured Streaming provides several output sinks for writing streaming data:

File Sink

Stores output to a directory in supported file formats (Parquet, ORC, JSON, CSV)

Kafka Sink

Writes output to one or more Kafka topics

Console Sink

Prints output to console/stdout (for debugging)

Memory Sink

Stores output in memory as an in-memory table (for debugging)

Output Modes

When writing streaming data, you can specify one of three output modes:
  • Append mode: Only new rows added to the result table will be written to the external storage
  • Complete mode: The entire updated result table will be written to external storage
  • Update mode: Only rows that were updated in the result table will be written to external storage
# Append mode
query = wordCounts \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/output/path") \
    .option("checkpointLocation", "/checkpoint/path") \
    .start()

# Complete mode
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Fault Tolerance

Structured Streaming provides end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. Checkpointing is enabled by setting the checkpointLocation option:
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "/path/to/checkpoint/dir") \
    .start()
The checkpoint location must be a path in an HDFS-compatible file system that is accessible by both the driver and executors.

Managing Streaming Queries

When you start a streaming query using start(), you get a StreamingQuery object that you can use to manage the query:
query = wordCounts.writeStream.format("console").start()

# Get the unique identifier of the running query
query.id

# Get the name of the auto-generated or user-specified name
query.name

# Check if there are any streaming queries active
spark.streams.active

# Block until query is terminated
query.awaitTermination()

# Stop the query
query.stop()

Next Steps

Kafka Integration

Learn how to integrate Structured Streaming with Apache Kafka

DStreams Programming Guide

Explore the legacy DStreams API (Spark Streaming)

Kinesis Integration

Connect to Amazon Kinesis for real-time data processing

Build docs developers (and LLMs) love