scalaapache-sparkapache-hudi

How to set custom hudi field for _hoodie_commit_time metadata column?


Hudi by default basing ingestion timeline on current time. I want to change this behavior and use my own datetime field during the ingestion. I want to use hudi table for storing only the latest state (last commits). There will be cases when data will come with a delay but this data should not overwrite the previous record (if it has the latest update). All my records have last_update_time datetime column that says when update was made.

Example:

  1. Day 1 received data:
+---+-----+---------------------------+
|id |value|last_update_time           |
+---+-----+---------------------------+
|100|a    |2022-11-14T13:51:39.340396Z|
|101|b    |2022-11-14T12:14:58.597216Z|
|103|c    |2022-11-14T12:14:58.597216Z|
+---+-----+---------------------------+
  1. Day 2 received data:
+---+-----+---------------------------+
|id |value|last_update_time           |
+---+-----+---------------------------+
|100|a1   |2022-11-25T13:51:39.340396Z| <- update (should overwrite previous field)
|101|b1   |2022-11-12T12:14:58.597216Z| <- delayed update (shouldn't overwrite previous value)
|104|d1   |2022-11-25T12:14:58.597216Z| <- new record (should overwrite previous field)
+---+-----+---------------------------+

Expected output I would like to get after the insert:

+---+-----+---------------------------+
|id |value|last_update_time           |
+---+-----+---------------------------+
|100|a1   |2022-11-14T13:51:39.340396Z|
|101|b    |2022-11-14T12:14:58.597216Z|
|103|c    |2022-11-14T12:14:58.597216Z|
|104|d1   |2022-11-14T12:14:58.597216Z|
+---+-----+---------------------------+

The output I am getting:

+---+-----+---------------------------+
|id |value|last_update_time           |
+---+-----+---------------------------+
|100|a1   |2022-11-14T13:51:39.340396Z|
|101|b1   |2022-11-14T12:14:58.597216Z|
|103|c    |2022-11-14T12:14:58.597216Z|
|104|d1   |2022-11-14T12:14:58.597216Z|
+---+-----+---------------------------+

Writer options I am using:

"compression": "snappy",
"hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.partitionpath.field": "",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.precombine.field": "last_update_time",
"hoodie.datasource.write.row.writer.enable": "true",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.fail.on.timeline.archiving": "false",
"hoodie.write.lock.zookeeper.url": "zookeeper",
"hoodie.write.lock.zookeeper.port": "2181",
"hoodie.write.lock.num_retries": "2",
"hoodie.write.lock.zookeeper.lock_key": "test",
"hoodie.write.lock.zookeeper.base_path": "/lock/hudi/test"

Solution

  • You cannot change Hudi commit time, because by definition, it's the instant when the commit was created, and if you change it, you will have a lot of problems in the table timeline.

    But Hudi provides a solution to solve your problem, where it supports upsert operation (used by default) to overwrite the existing data when there is a match in the Hudi key field, and insert the new records when there is not.
    By default, Hudi uses org.apache.hudi.common.model.OverwriteWithLatestAvroPayload as payload class, which overwrites the existing record with the new one without comparing the precombine field values, in your case, you want to compare the field last_update_time before overwrite them, so you need to use it as a precombie key (as you are doing) and use the payload class org.apache.hudi.common.model.DefaultHoodieRecordPayload

    You can solve your problem by just adding these two configurations:

    "hoodie.datasource.write.operation": "upsert", // add it just to ensure you are using upsert in case they change the default operation in the future
    "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload"