Starting from release 0.5.0, Delta Sharing supports querying Change Data Feed (CDF) from shared tables. This enables you to track row-level changes (inserts, updates, deletes) between table versions.
CDF must be enabled by the data provider on the original Delta table. Once enabled and shared through Delta Sharing, recipients can query the change feed just like they would with a regular Delta table.
// Read changes from version 3 to version 10val changesDF = spark.read .format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", "3") .option("endingVersion", "10") .load(tablePath)
Single version changes
# Read changes from only version 5changes_df = spark.read \ .format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", "5") \ .option("endingVersion", "5") \ .load(table_path)
Range of versions
# Read changes from version 10 to version 20changes_df = spark.read \ .format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", "10") \ .option("endingVersion", "20") \ .load(table_path)
From version to latest
# Read changes from version 15 to the latest versionchanges_df = spark.read \ .format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", "15") \ .load(table_path)
# Filter for only inserted rowsinserts = changes_df.filter(changes_df._change_type == "insert")inserts.show()
Represents newly added rows to the table.
# Get before and after images for updatesupdate_before = changes_df.filter(changes_df._change_type == "update_preimage")update_after = changes_df.filter(changes_df._change_type == "update_postimage")print("Before update:")update_before.show()print("After update:")update_after.show()
update_preimage: The row’s state before the update
update_postimage: The row’s state after the update
# Filter for deleted rowsdeletes = changes_df.filter(changes_df._change_type == "delete")deletes.show()
# Process only new and changed records since last runlast_processed_version = 10 # Retrieve from checkpoint/state storechanges_df = spark.read \ .format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", str(last_processed_version + 1)) \ .load(table_path)# Process inserts and updatesnew_and_updated = changes_df.filter( changes_df._change_type.isin(["insert", "update_postimage"]))# Apply transformations and write to targetnew_and_updated \ .select("customer_id", "name", "email", "updated_at") \ .write \ .mode("append") \ .parquet("/output/customer_updates")
When processing updates, you often need to handle preimage/postimage pairs:
from pyspark.sql.functions import col, leadfrom pyspark.sql.window import Window# Get only the final state after updateschanges_df = spark.read \ .format("deltaSharing") \ .option("readChangeFeed", "true") \ .option("startingVersion", "0") \ .load(table_path)# Keep inserts, update_postimage (final state), and deletesfinal_state = changes_df.filter( col("_change_type").isin(["insert", "update_postimage", "delete"]))final_state.show()
try { val changesDF = spark.read .format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", "3") .option("endingVersion", "10") .load(tablePath) changesDF.show()} catch { case e: Exception if e.getMessage.contains("Change data feed") => println("CDF is not enabled for this table. Contact the data provider.") case e: Exception if e.getMessage.contains("version") => println("Invalid version specified. Check available versions.") case e: Exception => println(s"Error reading change feed: ${e.getMessage}")}
Common Issues:
CDF not enabled: The data provider must enable CDF on the source table
Invalid versions: Ensure the specified versions exist in the table history
Version gaps: Some versions may not have changes if no operations occurred