I am using Flink streaming to read input from several resources including files. My aim is to trigger some calculations periodically (processing time) and when reach the end of the file to trigger the final result. My processing sub-topology looks like below
myGenerator
.generateData()
.map(...)
.keyBy(...)
.process(new TriggerFunction(timeout));
And one of my generator sources can be files with bounded data
e.g. env.readFile(inputFormat, filename, FileProcessingMode.PROCESS_ONCE, interval, typeInfo);
Therefore I am wondering if there is a mechanism to capture the event sent by the source operator that states the end of input has been reached and no more events are expected signaling the teardown of the topology.
As far as I understand when my source reaches the end of input (file), it marks the end of processing to downstream operators and therefore I do not have a chance or a way to trigger the final result which is emitted through a periodic timer registered in a udf process function.
I have tried opening the file source with 'FileProcessingMode.PROCESS_CONTINUOUSLY' but that seems to be more of a hack than a solution. I have also tried using a watermark strategy using punctuated watermarks but I am not sure how to capture the END_OF_INPUT from source and forward an appropriate watermark (e.g. Long.MAX_VALUE).
Is there a way to ensure triggering my timer even if the source events have been exhausted ?
With a bounded source, Flink will automatically send a watermark with the value Watermark.MAX_WATERMARK
after all of the input has been consumed. So it should be enough to create an event-time timer for Long.MAX_VALUE
.