Our project requires that we perform full loads daily, retaining these versions for future queries. Upon implementing Hudi to maintain 6 years of data with the following setup:
"hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
"hoodie.cleaner.hours.retained": "52560", # 24 hours * 365 days * 6 years
We observed, after about 30 runs, a compromise in data integrity. During reading, the versions of data mix up and produce duplicate records, causing a series of significant issues in our DataLake (S3), since these tables are used by other scripts.
To solve these problems, we made adjustments for the maximum and minimum amount of commits, applying the following configurations, as referenced in the issue #7600:
"hoodie.keep.max.commits": "2300", # (365 days * 6 years) + delta
"hoodie.keep.min.commits": "2200", # (365 days * 6 years) + delta2
However, this solution becomes excessively costly over time. We simulated running the scripts multiple times, partitioning by day, and both the difference and the writing cost grew significantly for a small table over a year of data. In 1 year, the average runtime for a script went from 00m:25s to 02m:30s. As we need to keep 6 years of history, this processing time tends to scale even more.
Replication
Follow the instructions below to reproduce the behavior:
data = [
Row(SK=-6698625589789238999, DSC='A', COD=1),
Row(SK=8420071140774656230, DSC='B', COD=2),
Row(SK=-8344648708406692296, DSC='C', COD=4),
Row(SK=504019808641096632, DSC='D', COD=5),
Row(SK=-233500712460350175, DSC='E', COD=6),
Row(SK=2786828215451145335, DSC='F', COD=7),
Row(SK=-8285521376477742517, DSC='G', COD=8),
Row(SK=-2852032610340310743, DSC='H', COD=9),
Row(SK=-188596373586653926, DSC='I', COD=10),
Row(SK=890099540967675307, DSC='J', COD=11),
Row(SK=72738756111436295, DSC='K', COD=12),
Row(SK=6122947679528380961, DSC='L', COD=13),
Row(SK=-3715488255824917081, DSC='M', COD=14),
Row(SK=7553013721279796958, DSC='N', COD=15)
]
dataframe = spark.createDataFrame(data)
hudi_options = {
"hoodie.table.name": "example_hudi",
"hoodie.datasource.write.recordkey.field": "SK",
"hoodie.datasource.write.table.name": "example_hudi",
"hoodie.datasource.write.operation": "insert_overwrite_table",
"hoodie.datasource.write.partitionpath.field": "LOAD_DATE",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": "example_hudi",
"hoodie.datasource.hive_sync.partition_fields": "LOAD_DATE",
"hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
"hoodie.cleaner.hours.retained": "52560",
"hoodie.keep.max.commits": "2300",
"hoodie.keep.min.commits":"2200",
"hoodie.datasource.write.precombine.field":"",
"hoodie.datasource.hive_sync.partition_extractor_class":"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.enable":"true",
"hoodie.datasource.hive_sync.use_jdbc":"false",
"hoodie.datasource.hive_sync.mode":"hms",
}
date = datetime.strptime('2023-06-02', '%Y-%m-%d') # Initial date (yyyy-mm-dd)
final_date = datetime.strptime('2023-11-01', '%Y-%m-%d') # Final date (yyyy-mm-dd)
while date <= final_date:
dataframe = dataframe.withColumn("LOAD_DATE", to_date(lit(date.strftime('%Y-%m-%d'))))
dataframe.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
date += timedelta(days=1)
Expected behavior
We expected:
hoodie.cleaner.policy KEEP_LATEST_BY_HOURS
attribute.Environment
Based on the discussion in this GitHub issue, the performance trade-off is anticipated due to the specific Hudi configuration and usage pattern.
When we modify the minimum and maximum commit values, Hudi has to load the equivalent number of commits to perform index lookups, a process essential for handling updates. With a smaller minimum and maximum commit count, Hudi only loaded the most recent 30 commits for index lookup. Consequently, if an update operation targeted a record ingested over 30 commits ago, duplicate entries might emerge.
However, as we raise the minimum and maximum commit counts, Hudi is forced to load more commits for index lookup, triggering an increase in execution time due to the necessary shuffling operation. This scenario demonstrates a trade-off we need to consider: while larger commit counts boost data integrity, they also extend execution times.