apache-kafkaduplicatesflink-streamingexactly-once

Does flink streaming job maintain its keyed value state between job runs?


Our usecase is we want to use flink streaming for a de-duplicator job, which reads it's data from source(kafka topic) and writes unique records into hdfs file sink. Kafka topic could have duplicate data, which can be identified by using composite key (adserver_id, unix_timestamp of the record)

so I decided to use flink keyed state stream to achieve de-duplication.

val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)

messageStream
  .map{
    record =>
      val key = record.adserver_id.get + record.event_timestamp.get
      (key,record)
  }
  .keyBy(_._1)
  .flatMap(new DedupDCNRecord())
  .map(_.toString)
  .addSink(sink)

  // execute the stream
  env.execute(applicationName)
}

Here is the code for de-duplication using value state from flink.

class DedupDCNRecord extends RichFlatMapFunction[(String, DCNRecord), DCNRecord] {
  private var operatorState: ValueState[String] = null

  override def open(configuration: Configuration) = {
    operatorState = getRuntimeContext.getState(
      DedupDCNRecord.descriptor
    )
  }

  @throws[Exception]
  override def flatMap(value: (String,DCNRecord), out: Collector[DCNRecord]): Unit = {

    if (operatorState.value == null) { // we haven't seen the element yet
      out.collect(value._2)
      // set operator state to true so that we don't emit elements with this key again
      operatorState.update(value._1)
    }
  }
}

While this approach works fine as long as streaming job is running and maintaining list of unique keys through valueState and performing de-duplication. But as soon as I cancel the job, flink looses it's state(unique keys seen in previous run of the job) for valueState(only keeps unique keys for the current run) and let the records pass, which were already processed in previous run of the job. Is there a way, we can enforce flink to mainatain it's valueState(unique_keys) seen so far ? Appreciate your help.


Solution

  • This requires you capture a snapshot of the state before shutting down the job, and then restart from that snapshot:

    1. Do a stop with savepoint to bring down your current job while taking a snapshot of its state.
    2. Relaunch, using the savepoint as the starting point.

    For a step-by-step tutorial, see Upgrading & Rescaling a Job in the Flink Operations Playground. The section on Observing Failure & Recovery is also relevant here.