I am writing an Application using Apache Flink to substitute an older Application. The old application gets fed with events from different sources and is used to monitor sources if their events meet certain criteria. (e.g. sensor values exceeding thresholds).
It reports these sources if the the thresholds are exceeded for a given period (days to weeks)
The new Application will do the same, consuming two kafka topics (event data and config data) and writing alerts to a third topic if the time period is exceeded for a monitored source (using timerService).
The Graph produced is 2 source topics (confics, events) into a KeyedCoProcessFunction into a sink topic (alerts)
I am searching for a possibility to migrate all monitored sources from the old application to the Apache Flink keyed state.
The sources will be exported as a file containing all information needed to initialize the new application.
I wrote a second application with a graph that is alike but not the same. -> 1 topic source (configs), 1 collection source (using file reader to provide all monitored sources) and a KeyedCoProcess function that has the same State definitions as the substitution.
The Idea was to:
I used the same uid's for the config source and the KeyedCoProcessFunction. Still the state from the savepoint was not ingested properly.
Skipping empty savepoint state for operator 1db5b24325c47f4692485c6f3204cb3b. // new event source
Reset the checkpoint ID of job d92e6e970306ea6465b0f253beddebbe to 6.
Restoring job d92e6e970306ea6465b0f253beddebbe from Savepoint 5 @ 0 for d92e6e970306ea6465b0f253beddebbe located at file:/tmp/savepoint-05ed46-60bd264d64df.
No master state to restore
Resetting coordinator to checkpoint.
Closing SourceCoordinator for source Source: Configs.
Source coordinator for source Source: Configs closed.
Restoring SplitEnumerator of source Source: Configs from checkpoint.
Resetting coordinator to checkpoint.
Closing SourceCoordinator for source Source: Events.
Source coordinator for source Source: Events closed.
Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@1bd0b5ba for Monitor_v1.0 (d92e6e970306ea6465b0f253beddebbe).
Starting execution of job 'Monitor_v1.0' (d92e6e970306ea6465b0f253beddebbe) under job master id 00000000000000000000000000000000.
Is there anyone who faced the same challenge or has an idea on how I could tackle this situation?
Thanks a lot BR M
We were able to find the underlying issue why the state was not restored properly.
In the code block where both operators and the sink are joined by the CoProcessFunction, I only provided a uid/name to the sink and not to the function itself. Since the graph built for the initializer application differs from the graph of the one that should take over, flink was not able to map the state properly.
Using env.disableOperatorChaining();
provided some extra insight to the situation by displaying every node separately.
sensorStreamOperator
.keyBy(SensorEvent::getMachineId)
.connect(configStreamOperator.keyBy(MachineConfig::getMachineId))
.process(new HandleEventDataCoProcess(outputTag)
.uid("CoProcessUID") // Provide separate uid to the processor!
.name("CoProcessFunction")
.getSideOutput(outputTag)
.sinkTo(outlierSink)
.uid("OutlierSinkUID")
.name("OutlierSink");