I have a sample streaming WordCount
example written in Flink (Scala). In it, I want to use externalized checkpointing to restore in case of failure. But it is not working as expected.
My code is as follows:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// prepare Kafka consumer properties
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
kafkaConsumerProperties.setProperty("group.id", "flink")
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")
// set up Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)
println("Executing WordCount example.")
// get text from Kafka
val text = env.addSource(kafkaConsumer)
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ((in._1, c), Some(c + in._2))
case None => ((in._1, 1), Some(in._2 + 1))
})
// emit result
println("Printing result to stdout.")
counts.print()
// execute program
env.execute("Streaming WordCount")
}
}
The output I get after running program first time is:
(hi, 1)
(hi, 2)
The output I get after running program second time is:
(hi, 1)
My expectation is that running program second time should give me the following output:
(hi, 3)
Since I am a newbie to Apache Flink, I don't know how to achieve the expected result. Can anyone help me achieve the correct behavior?
Flink only restarts from the latest checkpoint if the application is restarted within the same execution (regular, automatic recovery).
If you cancel a job running in a local exeuction environment in the IDE, you kill the whole cluster and the job cannot be automatically recovered. Instead you need to start it again. In order to restart a new job from a savepoint (or externalized checkpoint), you need to provide a path to the persisted savepoint/checkpoint. Not sure if that is possible with a local execution environment.
IMO it is easier to play around with checkpointing and recovery on a local Flink instance and not within an IDE.