javaapache-flinkflink-streaminghadoop-streaming

How to change the file name with updated date in flink job


I have a flink job which streams data to azure using hadoop fs. Currently I'm able to push the data and create a new file but I want to roll the new file when there is a date change(like from 2025-03-12 to 2025-04-13) so a new file gets created and the content is pushed to the new file and old file will be changed from in-progress to completed.I have tried updating the name but the date is not getting updated with new date. Can anyone help here how to change the file name when there is a new date.


Solution

  • It sounds like you want a custom RollingPolicy that will trigger rolling when the date changes. To do this, extend the abstract CheckpointRollingPolicy, and implement the shouldRollOnEvent method such that it returns true when the date changes. See the FileSystem documentation for more details on how to use a custom RollingPolicy.

    Note that this requires events to be precisely ordered by time, as otherwise you might trigger a roll before you have all the events for a given day. There's also an issue when the rolling policy is started, where it doesn't know the current date, so you might miss a roll that you should have done.

    Another (probably better) approach is to bucket by day, and then if your rolling policy is on checkpointing, you can wind up with a date-stamped directory (the bucket name) with 1..n files that are all for events of that one day.