apache-sparkhivegoogle-cloud-dataprocapache-iceberg

Iceberg table snapshots not expired


I'm using Dataroc Metastore, Dataproc Batch and Pyspark. Although I'm using GCP, I beleive this is general Apache Iceberg question.

I run My spark job and created Iceberg trips table with automatic snapshot expiration in 1 hour history.expire.max-snapshot-age-ms=3600000 and wrote from content of CSV file into table.

conf = (
    SparkConf()
    .setAppName('read_from_iceberg')
    .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
    .set('spark.sql.catalog.spark_catalog.type', 'hive')
    .set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
    .set(f'spark.sql.catalog.dev.type', 'hive')
    .set(f'spark.sql.warehouse.dir', lake_bucket)
)
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

schema = StructType([
    StructField("vendor_id", LongType(), True),
    StructField("trip_id", LongType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("store_and_fwd_flag", StringType(), True)
])
# Create database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS dev.lakehouse")

# Create table if doesn't exist.
# df = spark.createDataFrame([], schema)
df.writeTo("dev.lakehouse.trips").partitionedBy("vendor_id")
 .tableProperty('format-version', '2')
 .tableProperty("history.expire.max-snapshot-age-ms","3600000").createOrReplace())

df3 = spark.read.option("delimiter", ",").schema(schema).option("header", True).csv(
    "gs://my-super-bucket/csv-input/bulk/*")

df3.write.mode('append').format("iceberg").insertInto("dev.lakehouse.trips")
 

I repeated batch execution for a several times and as the result I have 45 millions objects from writes.

+---------+
| count(1)|
+---------+
|450000000|
+---------+

Now I would like to see the table history.

spark.sql("SELECT * FROM dev.lakehouse.trips4.history").show()

And the result is the following:

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2023-11-08 09:05:...|3365635318905728444|               null|               true|
|2023-11-08 09:07:...|8818173850344394660|3365635318905728444|               true|
|2023-11-08 09:18:...|7080281147456503211|8818173850344394660|               true|
|2023-11-08 09:26:...|1124704647664806615|7080281147456503211|               true|
|2023-11-08 09:43:...|1410379929547885531|1124704647664806615|               true|
|2023-11-08 09:44:...|2828602979849095888|1410379929547885531|               true|
|2023-11-08 11:59:...|3836167930220261494|2828602979849095888|               true|
|2023-11-08 12:09:...|7872321137982208330|3836167930220261494|               true|
+--------------------+-------------------+-------------------+-------------------+

Although, the expiration is set to one hour, I still see all other snapshots that supposed to be removed.

I know that I can always use

spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)")

and this will remove older then a given timestamp, but should't it be done automatic?


Solution

  • Expired Iceberg table snapshots are not automatically deleted. The doc says

    Snapshots accumulate until they are expired by the expireSnapshots operation. Regularly expiring snapshots is recommended to delete data files that are no longer needed.

    The config history.expire.max-snapshot-age-ms is

    Default max age of snapshots to keep on the table and all of its branches while expiring snapshots.

    The code spark.sql("CALL dev.system.expire_snapshots('dev.lakehouse.trips4', TIMESTAMP '2023-11-08 11:00:00.000', 1)") you show is the Spark SQL extension for expiring snapshots.