so I'm trying to create my own windowing scheme thought the use of unkeyed processFunctions. I'm using a source and would like to use watermarks. My current implementation of watermarks is as follows
this.watermarkStrategy = WatermarkStrategy
.<EventBasic>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
I've created my source as follows
DataStream<EventBasic> mainStream = env.readTextFile(csvFilePath)
.map(new MapFunction<String, EventBasic>() {
@Override
public EventBasic map(String line) throws Exception {
String[] parts = line.split(",");
if (parts.length == 3) {
String key = parts[0];
int valueInt = Integer.parseInt(parts[1]);
long valueTimeStamp = Long.parseLong(parts[2]);
return new EventBasic(key, valueInt, valueTimeStamp);
} else {
return null;
}
}
}).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");
this source function reads a CSV file that has the following format:
key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500
With timestamps increasing monotonously
when observing immediately (i created a dummy processfunction to observe that my timestamps were working) I observe the value -9223372036854775808
which constantly. This means that the watermark generation doesn't know when to add a new watermark.
I've also tried the following watermark strategy which lead to the same output:
this.watermarkStrategy = WatermarkStrategy
.<EventBasic>forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
I don't know what my issue could be and I've tried looking everywhere but nothing seems to change.
So it turns out the issue why the watermarks were not increasing, but the timestamps were working correctly is due to the fact that I had set the environment to BATCH
mode (env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
) therefore with flink they believe that since the data is known in advance there is no need for watermarks.
src:https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution_mode/
I hope that this will help anyone who ever falls on this issue.