Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, and more. The computation is executed on the same optimized Spark SQL engine. The system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs.Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. Since Spark 2.3, a new low-latency processing mode called Continuous Processing can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Here’s how you can express this using Structured Streaming:
Python
Scala
Java
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import explode, splitspark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate()# Create DataFrame representing the stream of input lines from connection to localhost:9999lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load()# Split the lines into wordswords = lines.select( explode( split(lines.value, " ") ).alias("word"))# Generate running word countwordCounts = words.groupBy("word").count()# Start running the query that prints the running counts to the consolequery = wordCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .start()query.awaitTermination()
import org.apache.spark.sql.functions._import org.apache.spark.sql.SparkSessionval spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate()import spark.implicits._// Create DataFrame representing the stream of input lines from connection to localhost:9999val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()// Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))// Generate running word countval wordCounts = words.groupBy("value").count()// Start running the query that prints the running counts to the consoleval query = wordCounts.writeStream .outputMode("complete") .format("console") .start()query.awaitTermination()
import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.sql.*;import org.apache.spark.sql.streaming.StreamingQuery;import java.util.Arrays;SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate();// Create DataFrame representing the stream of input lines from connection to localhost:9999Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load();// Split the lines into wordsDataset<String> words = lines .as(Encoders.STRING()) .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());// Generate running word countDataset<Row> wordCounts = words.groupBy("value").count();// Start running the query that prints the running counts to the consoleStreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start();query.awaitTermination();
This lines DataFrame represents an unbounded table containing the streaming text data. The Spark SQL engine will take care of running it incrementally and continuously, updating the final result as streaming data continues to arrive.
You can create streaming DataFrames through the DataStreamReader interface returned by SparkSession.readStream(). Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
Reads UTF8 text data from a socket connection. The listening server socket is at the driver. This should be used only for testing as it does not provide end-to-end fault-tolerance guarantees.
Generates data at the specified number of rows per second. Each output row contains a timestamp and value. This source is intended for testing and benchmarking.
You can apply most common operations on streaming DataFrames – ranging from untyped, SQL-like operations (e.g. select, where, groupBy) to typed RDD-like operations (e.g. map, filter, flatMap).
# streaming DataFrame with IOT device datadf = ... # schema: { device: string, deviceType: string, signal: double, time: DateType }# Select the devices which have signal more than 10df.select("device").where("signal > 10")# Running count of the number of updates for each device typedf.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)val df: DataFrame = ... // streaming DataFrame with IOT device dataval ds: Dataset[DeviceData] = df.as[DeviceData]// Select the devices which have signal more than 10df.select("device").where("signal > 10")// Running count of the number of updates for each device typedf.groupBy("deviceType").count()
// streaming DataFrame with IOT device dataDataset<Row> df = ...// Select the devices which have signal more than 10df.select("device").where("signal > 10");// Running count of the number of updates for each device typedf.groupBy("deviceType").count();
Structured Streaming supports time-based window operations. You can use event-time windows to group data by time windows.
# Group the data by window and word then compute the countwindowedCounts = words \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word ).count()
Structured Streaming provides end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. Checkpointing is enabled by setting the checkpointLocation option:
When you start a streaming query using start(), you get a StreamingQuery object that you can use to manage the query:
Python
Scala
query = wordCounts.writeStream.format("console").start()# Get the unique identifier of the running queryquery.id# Get the name of the auto-generated or user-specified namequery.name# Check if there are any streaming queries activespark.streams.active# Block until query is terminatedquery.awaitTermination()# Stop the queryquery.stop()
val query = wordCounts.writeStream.format("console").start()// Get the unique identifier of the running queryquery.id// Get the name of the auto-generated or user-specified namequery.name// Check if there are any streaming queries activespark.streams.active// Block until query is terminatedquery.awaitTermination()// Stop the queryquery.stop()