I am trying to read incremental data between two snapshots
I have last processed snapshot (my day0 load) and below is my code snippet to read incremental data
incremental_df = spark.read.format("iceberg") \
.option("start-snapshot-id", str(first_snapshot_id)) \
.option("end-snapshot-id", str(last_snapshot_id)) \
.load(table)
If I query to see snapshots using below query on Athena I got this result (this is just required snapshots got using limit)
SELECT committed_at,snapshot_id,parent_id, operation,summary FROM "iceberg_table.$snapshots"
# committed_at snapshot_id parent_id operation summary
1 2025-04-16 07:07:25.469 UTC 657179279488385900 5188970741575144205 append {changed-partition-count=1, added-data-files=1, total-equality-deletes=6, added-records=2, trino_query_id=20250416_070724_00159_5zcft, total-position-deletes=2, added-files-size=2047, total-delete-files=8, total-files-size=92268, total-records=4130, total-data-files=10}
2 2025-04-16 07:24:26.655 UTC 8635633269075484497 657179279488385900 overwrite {added-data-files=1, added-position-deletes=1, total-equality-deletes=6, added-records=1, trino_query_id=20250416_072424_00031_85erx, added-position-delete-files=1, added-delete-files=1, total-records=4131, changed-partition-count=1, total-position-deletes=3, added-files-size=2950, total-delete-files=9, total-files-size=95218, total-data-files=11}
In the first snapshot I added two new records and the other snapshot I updated the existing record (if I query using that snapshot, change is reflecting in table) and now what I am expecting is
Unfortunately, Spark incremental read of Iceberg tables is just limited for append-only data. From the docs:
Currently gets only the data from
append
operation. Cannot supportreplace
,overwrite
,delete
operations. Incremental read works with both V1 and V2 format-version. Incremental read is not supported by Spark's SQL syntax.
In other words, Spark incremental read ignores updates and deletes, whereas regular Spark SQL doesn't support snapshot-based incremental read.
So to incrementally process mutable data, you have to implement a custom logic to select added/modified rows based not on snapshots, but on your data, for example using a custom update timestamps, IDs, offsets, etc.