apache-flinkflink-streamingwatermarkflink-batch

Apache Flink : Batch Mode failing for Datastream API's with exception `IllegalStateException: Checkpointing is not allowed with sorted inputs.`


A continuation to this : Flink : Handling Keyed Streams with data older than application watermark

based on the suggestion, I have been trying to add support for Batch in the same Flink application which was using the Datastream API's.

The logic is something like this :

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile("fileName")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DetectionEvent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

Based on the public docs, my understanding was that i simply needed to change the source to a bounded one. However the above processing keeps on failing at the event trigger after the windowing step with the below exception :

java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)

The input file contains the historical events for multiple keys. The data for a given key is sorted, but the overall data is not. I have also added an event at the end of each key with the timestamp = MAX_WATERMARK to indicate end of keyed Stream. I tried it for a single key as well but the processing failed with the same exception.

Note: I have not enabled checkpointing. I have also tried explicitly disabling checkpointing to no avail.

env.getCheckpointConfig().disableCheckpointing();

EDIT - 1

Adding more details : I tried changing and using FileSource to read files but still getting the same exception.

environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")

The first process step and key splitting works. However it fails after that. I tried removing windowing and adding a simple process step but it continues to fail. There is no explicit Sink. The last process function simply updates a database.

process exception timeline Attaching images in case they help.

Is there something I'm missing ?


Solution

  • That exception can only be thrown if checkpointing is enabled. Perhaps you can a checkpointing interval configured in flink-conf.yaml?