I use spark-streaming to read kafka data,and process every line
I use below to create a streaming :
lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
);
and then I use this code to process data from kafka
lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
OffsetRange[] range = new OffsetRange[1];
range[0] = o;
rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
// get kafka offset
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
// to cache line data
List<String> jsonData = new ArrayList<>();
// to read all line data
while (partitionOfRecords.hasNext()) {
ConsumerRecord<String, String> line = partitionOfRecords.next();
jsonData.add(line.value());
}
// TODO do my own business from jsonData
.......
// HOW can I commit kafka Offset Here??
// this is a method to commit offset
((CanCommitOffsets) lines.inputDStream()).commitAsync(range)
});
});
And I have try many times, I found it have some problem:
How does it work if my data process success when other partition failed? it means all my data process should come back? Because kafka offset has commit
I have run this code, then I found it really execute commit operate is when the next time this rdd executor run,it means if the progress oom or be killed, the next time some data I read from Kafka will double?
How does it work if my data process success when other partition failed? it means all my data process should come back? Because kafka offset has commit
If a particular task fails, Spark will attempt to re-execute it in place according to the spark.task.maxFailures
setting. If the number has passed, the entire job will fail. You need to make sure that if the part before commitAsync
fails, you don't commit offsets.
I have run this code, then I found it really execute commit operate is when the next time this rdd executor run,it means if the progress oom or be killed , the next time some data I read from Kafka will double ?
Yes. If the job is killed before the next batch iteration, Spark will attempt to re-read data that's already been processed.