Skip to main content

Overview

Starting from release 0.6.0, Delta Sharing tables can be used as data sources for Spark Structured Streaming. This enables real-time processing of shared data as new records are added to the table.
The data provider must share the table with history enabled for streaming to work. Contact your data provider if streaming is not available for a particular table.

Basic Streaming

To create a streaming DataFrame from a Delta Sharing table, use readStream instead of read:
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

val streamingDF = spark.readStream
  .format("deltaSharing")
  .load(tablePath)

// Write to console for testing
streamingDF.writeStream
  .format("console")
  .start()
  .awaitTermination()

Streaming Options

Delta Sharing streaming supports several options to control how data is read:

startingVersion

Specify which version of the table to start streaming from:
val streamingDF = spark.readStream
  .format("deltaSharing")
  .option("startingVersion", "1")
  .load(tablePath)
# Start from the beginning (version 0)
streaming_df = spark.readStream \
  .format("deltaSharing") \
  .option("startingVersion", "0") \
  .load(table_path)
Processes all historical data from the table’s first version.
# Start from version 10
streaming_df = spark.readStream \
  .format("deltaSharing") \
  .option("startingVersion", "10") \
  .load(table_path)
Begins processing from a specific table version, skipping earlier versions.
# Start from latest version (default behavior)
streaming_df = spark.readStream \
  .format("deltaSharing") \
  .load(table_path)
Only processes new data added after the stream starts.

skipChangeCommits

Control whether to include change data feed (CDF) commits:
val streamingDF = spark.readStream
  .format("deltaSharing")
  .option("startingVersion", "1")
  .option("skipChangeCommits", "true")
  .load(tablePath)
Set skipChangeCommits to "true" when streaming append-only data. If the shared table includes UPDATE or DELETE operations, these will be skipped when this option is enabled.

Complete Example

Here’s a full example demonstrating streaming with all options:
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

val tablePath = "/opt/profiles/production.share#analytics.streaming.events"

// Create streaming DataFrame
val streamingDF = spark.readStream
  .format("deltaSharing")
  .option("startingVersion", "1")
  .option("skipChangeCommits", "true")
  .load(tablePath)

// Process the stream
val query = streamingDF
  .filter($"event_type" === "purchase")
  .groupBy($"user_id")
  .count()
  .writeStream
  .format("parquet")
  .option("path", "/output/purchases")
  .option("checkpointLocation", "/checkpoints/purchases")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

Output Modes

Delta Sharing streaming supports all Spark Structured Streaming output modes:
# Append mode - only new rows
streaming_df.writeStream \
  .format("parquet") \
  .outputMode("append") \
  .option("path", "/output/data") \
  .option("checkpointLocation", "/checkpoints/data") \
  .start()
Best for append-only operations. Only newly added rows are written to the sink.

Triggers

Control how often the streaming query processes new data:
// Process as soon as previous batch completes
val query = streamingDF.writeStream
  .format("parquet")
  .option("path", "/output")
  .option("checkpointLocation", "/checkpoints")
  .start()

Stream Processing Patterns

Windowed Aggregations

from pyspark.sql.functions import window, col

# Assuming the table has a timestamp column
windowed_counts = streaming_df \
  .groupBy(
    window(col("timestamp"), "10 minutes", "5 minutes"),
    col("event_type")
  ) \
  .count()

windowed_counts.writeStream \
  .format("console") \
  .outputMode("complete") \
  .start() \
  .awaitTermination()

Watermarking for Late Data

import org.apache.spark.sql.functions._

val withWatermark = streamingDF
  .withWatermark("timestamp", "1 hour")
  .groupBy(
    window($"timestamp", "10 minutes"),
    $"user_id"
  )
  .count()

withWatermark.writeStream
  .format("parquet")
  .outputMode("append")
  .option("path", "/output/windowed")
  .option("checkpointLocation", "/checkpoints/windowed")
  .start()

Stateful Operations

from pyspark.sql.functions import expr

# Deduplication
deduplicated = streaming_df \
  .withWatermark("timestamp", "24 hours") \
  .dropDuplicates(["user_id", "event_id"])

deduplicated.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("path", "/output/deduplicated") \
  .option("checkpointLocation", "/checkpoints/deduplicated") \
  .start()

Monitoring Streams

Monitor your streaming queries for health and performance:
val query = streamingDF.writeStream
  .format("parquet")
  .option("path", "/output")
  .option("checkpointLocation", "/checkpoints")
  .start()

// Get query status
println(s"Query ID: ${query.id}")
println(s"Query Name: ${query.name}")
println(s"Is Active: ${query.isActive}")

// Get last progress
val progress = query.lastProgress
if (progress != null) {
  println(s"Batch ID: ${progress.batchId}")
  println(s"Input Rows: ${progress.numInputRows}")
  println(s"Processing Rate: ${progress.processedRowsPerSecond}")
}

// Get recent progress
query.recentProgress.foreach(p => {
  println(s"Batch ${p.batchId}: ${p.numInputRows} rows")
})

Limitations

Current Limitations:
  1. Trigger.AvailableNow is not supported - Delta Sharing uses Spark 3.1.1, which doesn’t include this trigger type (available since Spark 3.3.0)
  2. Table must have history enabled - The data provider must configure the shared table with history sharing enabled
  3. No time travel in streaming - You cannot use time travel options in streaming queries
  4. Change data feed commits - If skipChangeCommits is false, UPDATE and DELETE operations will be included in the stream

Checkpointing

Checkpoints are essential for fault tolerance in streaming:
# Always specify a checkpoint location
streaming_df.writeStream \
  .format("parquet") \
  .option("path", "/output/events") \
  .option("checkpointLocation", "/checkpoints/events") \
  .start()
Checkpoint Best Practices:
  • Use a reliable storage system (HDFS, S3, ADLS) for checkpoints
  • Don’t reuse checkpoint locations between different queries
  • Keep checkpoint locations separate from output paths
  • Checkpoint locations should be persistent across application restarts

Error Handling

try {
  val query = streamingDF.writeStream
    .format("parquet")
    .option("path", "/output")
    .option("checkpointLocation", "/checkpoints")
    .start()
  
  // Monitor for exceptions
  while (query.isActive) {
    if (query.exception.isDefined) {
      println(s"Query failed with error: ${query.exception.get}")
      query.stop()
    }
    Thread.sleep(5000)
  }
} catch {
  case e: Exception => 
    println(s"Streaming query failed: ${e.getMessage}")
    // Implement retry logic or alerting
}

Next Steps

Change Data Feed

Query table changes with CDF

Quick Start

Return to basic usage examples

Build docs developers (and LLMs) love