sqlpysparkaws-gluecdcapache-iceberg

Read incremental data from iceberg tables using Spark SQL


I am trying to read incremental data between two snapshots

I have last processed snapshot (my day0 load) and below is my code snippet to read incremental data

incremental_df = spark.read.format("iceberg") \
    .option("start-snapshot-id", str(first_snapshot_id)) \
    .option("end-snapshot-id", str(last_snapshot_id)) \
    .load(table)

If I query to see snapshots using below query on Athena I got this result (this is just required snapshots got using limit)

SELECT committed_at,snapshot_id,parent_id, operation,summary FROM "iceberg_table.$snapshots"

#   committed_at    snapshot_id parent_id   operation   summary
1   2025-04-16 07:07:25.469 UTC 657179279488385900  5188970741575144205 append  {changed-partition-count=1, added-data-files=1, total-equality-deletes=6, added-records=2, trino_query_id=20250416_070724_00159_5zcft, total-position-deletes=2, added-files-size=2047, total-delete-files=8, total-files-size=92268, total-records=4130, total-data-files=10}
2   2025-04-16 07:24:26.655 UTC 8635633269075484497 657179279488385900  overwrite   {added-data-files=1, added-position-deletes=1, total-equality-deletes=6, added-records=1, trino_query_id=20250416_072424_00031_85erx, added-position-delete-files=1, added-delete-files=1, total-records=4131, changed-partition-count=1, total-position-deletes=3, added-files-size=2950, total-delete-files=9, total-files-size=95218, total-data-files=11}

In the first snapshot I added two new records and the other snapshot I updated the existing record (if I query using that snapshot, change is reflecting in table) and now what I am expecting is

  1. Total 3 records in my incremental_df (its giving only 2 inserted records)
  2. How to read updated record also (which I need for dimension tables)
  3. Is it correct way to read iceberg tables (I am not writing this data again into iceberg format, writing in normal hive format partitioned data)

Solution

  • Unfortunately, Spark incremental read of Iceberg tables is just limited for append-only data. From the docs:

    Currently gets only the data from append operation. Cannot support replace, overwrite, delete operations. Incremental read works with both V1 and V2 format-version. Incremental read is not supported by Spark's SQL syntax.

    In other words, Spark incremental read ignores updates and deletes, whereas regular Spark SQL doesn't support snapshot-based incremental read.

    So to incrementally process mutable data, you have to implement a custom logic to select added/modified rows based not on snapshots, but on your data, for example using a custom update timestamps, IDs, offsets, etc.