Overview
Starting from release 0.6.0, Delta Sharing tables can be used as data sources for Spark Structured Streaming . This enables real-time processing of shared data as new records are added to the table.
The data provider must share the table with history enabled for streaming to work. Contact your data provider if streaming is not available for a particular table.
Basic Streaming
To create a streaming DataFrame from a Delta Sharing table, use readStream instead of read:
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
val streamingDF = spark.readStream
.format( "deltaSharing" )
.load(tablePath)
// Write to console for testing
streamingDF.writeStream
.format( "console" )
.start()
.awaitTermination()
table_path = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
streaming_df = spark.readStream \
.format( "deltaSharing" ) \
.load(table_path)
# Write to console for testing
streaming_df.writeStream \
.format( "console" ) \
.start() \
.awaitTermination()
String tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>" ;
Dataset < Row > streamingDF = spark . readStream ()
. format ( "deltaSharing" )
. load (tablePath);
// Write to console for testing
streamingDF . writeStream ()
. format ( "console" )
. start ()
. awaitTermination ();
Streaming Options
Delta Sharing streaming supports several options to control how data is read:
startingVersion
Specify which version of the table to start streaming from:
val streamingDF = spark.readStream
.format( "deltaSharing" )
.option( "startingVersion" , "1" )
.load(tablePath)
# Start from the beginning (version 0)
streaming_df = spark.readStream \
.format( "deltaSharing" ) \
.option( "startingVersion" , "0" ) \
.load(table_path)
Processes all historical data from the table’s first version.
# Start from version 10
streaming_df = spark.readStream \
.format( "deltaSharing" ) \
.option( "startingVersion" , "10" ) \
.load(table_path)
Begins processing from a specific table version, skipping earlier versions.
# Start from latest version (default behavior)
streaming_df = spark.readStream \
.format( "deltaSharing" ) \
.load(table_path)
Only processes new data added after the stream starts.
skipChangeCommits
Control whether to include change data feed (CDF) commits:
val streamingDF = spark.readStream
.format( "deltaSharing" )
.option( "startingVersion" , "1" )
.option( "skipChangeCommits" , "true" )
.load(tablePath)
Set skipChangeCommits to "true" when streaming append-only data. If the shared table includes UPDATE or DELETE operations, these will be skipped when this option is enabled.
Complete Example
Here’s a full example demonstrating streaming with all options:
import org . apache . spark . sql . streaming . Trigger
import scala . concurrent . duration . _
val tablePath = "/opt/profiles/production.share#analytics.streaming.events"
// Create streaming DataFrame
val streamingDF = spark.readStream
.format( "deltaSharing" )
.option( "startingVersion" , "1" )
.option( "skipChangeCommits" , "true" )
.load(tablePath)
// Process the stream
val query = streamingDF
.filter($ "event_type" === "purchase" )
.groupBy($ "user_id" )
.count()
.writeStream
.format( "parquet" )
.option( "path" , "/output/purchases" )
.option( "checkpointLocation" , "/checkpoints/purchases" )
.trigger( Trigger . ProcessingTime ( "10 seconds" ))
.start()
query.awaitTermination()
Output Modes
Delta Sharing streaming supports all Spark Structured Streaming output modes:
# Append mode - only new rows
streaming_df.writeStream \
.format( "parquet" ) \
.outputMode( "append" ) \
.option( "path" , "/output/data" ) \
.option( "checkpointLocation" , "/checkpoints/data" ) \
.start()
Best for append-only operations. Only newly added rows are written to the sink. # Complete mode - entire result table
streaming_df \
.groupBy( "category" ).count() \
.writeStream \
.format( "memory" ) \
.outputMode( "complete" ) \
.queryName( "category_counts" ) \
.start()
Used with aggregations. The entire result table is written to the sink after each trigger. # Update mode - only updated rows
streaming_df \
.groupBy( "user_id" ).count() \
.writeStream \
.format( "delta" ) \
.outputMode( "update" ) \
.option( "path" , "/output/user_counts" ) \
.option( "checkpointLocation" , "/checkpoints/user_counts" ) \
.start()
Only rows that were updated in the result table are written to the sink.
Triggers
Control how often the streaming query processes new data:
Default (micro-batch)
Fixed Interval
Once (single batch)
Continuous
// Process as soon as previous batch completes
val query = streamingDF.writeStream
.format( "parquet" )
.option( "path" , "/output" )
.option( "checkpointLocation" , "/checkpoints" )
.start()
Stream Processing Patterns
Windowed Aggregations
from pyspark.sql.functions import window, col
# Assuming the table has a timestamp column
windowed_counts = streaming_df \
.groupBy(
window(col( "timestamp" ), "10 minutes" , "5 minutes" ),
col( "event_type" )
) \
.count()
windowed_counts.writeStream \
.format( "console" ) \
.outputMode( "complete" ) \
.start() \
.awaitTermination()
Watermarking for Late Data
import org . apache . spark . sql . functions . _
val withWatermark = streamingDF
.withWatermark( "timestamp" , "1 hour" )
.groupBy(
window($ "timestamp" , "10 minutes" ),
$ "user_id"
)
.count()
withWatermark.writeStream
.format( "parquet" )
.outputMode( "append" )
.option( "path" , "/output/windowed" )
.option( "checkpointLocation" , "/checkpoints/windowed" )
.start()
Stateful Operations
from pyspark.sql.functions import expr
# Deduplication
deduplicated = streaming_df \
.withWatermark( "timestamp" , "24 hours" ) \
.dropDuplicates([ "user_id" , "event_id" ])
deduplicated.writeStream \
.format( "delta" ) \
.outputMode( "append" ) \
.option( "path" , "/output/deduplicated" ) \
.option( "checkpointLocation" , "/checkpoints/deduplicated" ) \
.start()
Monitoring Streams
Monitor your streaming queries for health and performance:
val query = streamingDF.writeStream
.format( "parquet" )
.option( "path" , "/output" )
.option( "checkpointLocation" , "/checkpoints" )
.start()
// Get query status
println( s "Query ID: ${ query.id } " )
println( s "Query Name: ${ query.name } " )
println( s "Is Active: ${ query.isActive } " )
// Get last progress
val progress = query.lastProgress
if (progress != null ) {
println( s "Batch ID: ${ progress.batchId } " )
println( s "Input Rows: ${ progress.numInputRows } " )
println( s "Processing Rate: ${ progress.processedRowsPerSecond } " )
}
// Get recent progress
query.recentProgress.foreach(p => {
println( s "Batch ${ p.batchId } : ${ p.numInputRows } rows" )
})
Limitations
Current Limitations:
Trigger.AvailableNow is not supported - Delta Sharing uses Spark 3.1.1, which doesn’t include this trigger type (available since Spark 3.3.0)
Table must have history enabled - The data provider must configure the shared table with history sharing enabled
No time travel in streaming - You cannot use time travel options in streaming queries
Change data feed commits - If skipChangeCommits is false, UPDATE and DELETE operations will be included in the stream
Checkpointing
Checkpoints are essential for fault tolerance in streaming:
# Always specify a checkpoint location
streaming_df.writeStream \
.format( "parquet" ) \
.option( "path" , "/output/events" ) \
.option( "checkpointLocation" , "/checkpoints/events" ) \
.start()
Checkpoint Best Practices:
Use a reliable storage system (HDFS, S3, ADLS) for checkpoints
Don’t reuse checkpoint locations between different queries
Keep checkpoint locations separate from output paths
Checkpoint locations should be persistent across application restarts
Error Handling
try {
val query = streamingDF.writeStream
.format( "parquet" )
.option( "path" , "/output" )
.option( "checkpointLocation" , "/checkpoints" )
.start()
// Monitor for exceptions
while (query.isActive) {
if (query.exception.isDefined) {
println( s "Query failed with error: ${ query.exception.get } " )
query.stop()
}
Thread .sleep( 5000 )
}
} catch {
case e : Exception =>
println( s "Streaming query failed: ${ e.getMessage } " )
// Implement retry logic or alerting
}
Next Steps
Change Data Feed Query table changes with CDF
Quick Start Return to basic usage examples