I'm having an issue writing more than 1 file per partition to Iceberg.
Here is my write command:
df.repartition(
partitions, col("exposure_id"), col("event_date"), col("advertising_id"))
.sortWithinPartitions(col("advertising_id"), col("timestamp"))
.writeTo(fullTableName)
.append()
This works if my data only has a single partition to write to, but if my data contains multiple partitions, it fails with this error:
Caused by: java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'exposure_id=10/event_date=2024-06-28' in spec [
1000: exposure_id: identity(13)
1001: event_date: identity(14)
]
Removing advertising_id from the repartition statement makes it work, but the performance is terrible since there is a lot of data on each of my partitions that all ends up on a single executor per partition.
How can I write multiple files to iceberg per partition without it freaking out?
In order to write to more than 1 partition per file, fanout needs to be enabled at the time of table creation.
df.writeTo(table)
.tableProperty("location", location)
.tableProperty("write.spark.fanout.enabled", "true")
.partitionedBy(cols)
.using("iceberg")
.create()