Our usecase is we want to use flink streaming for a de-duplicator job, which reads it's data from source(kafka topic) and writes unique records into hdfs file sink. Kafka topic could have duplicate data, which can be identified by using composite key (adserver_id, unix_timestamp of the record)
so I decided to use flink keyed state stream to achieve de-duplication.
val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
messageStream
.map{
record =>
val key = record.adserver_id.get + record.event_timestamp.get
(key,record)
}
.keyBy(_._1)
.flatMap(new DedupDCNRecord())
.map(_.toString)
.addSink(sink)
// execute the stream
env.execute(applicationName)
}
Here is the code for de-duplication using value state from flink.
class DedupDCNRecord extends RichFlatMapFunction[(String, DCNRecord), DCNRecord] {
private var operatorState: ValueState[String] = null
override def open(configuration: Configuration) = {
operatorState = getRuntimeContext.getState(
DedupDCNRecord.descriptor
)
}
@throws[Exception]
override def flatMap(value: (String,DCNRecord), out: Collector[DCNRecord]): Unit = {
if (operatorState.value == null) { // we haven't seen the element yet
out.collect(value._2)
// set operator state to true so that we don't emit elements with this key again
operatorState.update(value._1)
}
}
}
While this approach works fine as long as streaming job is running and maintaining list of unique keys through valueState and performing de-duplication. But as soon as I cancel the job, flink looses it's state(unique keys seen in previous run of the job) for valueState(only keeps unique keys for the current run) and let the records pass, which were already processed in previous run of the job. Is there a way, we can enforce flink to mainatain it's valueState(unique_keys) seen so far ? Appreciate your help.
This requires you capture a snapshot of the state before shutting down the job, and then restart from that snapshot:
For a step-by-step tutorial, see Upgrading & Rescaling a Job in the Flink Operations Playground. The section on Observing Failure & Recovery is also relevant here.