apache-kafkaproject-reactorreactor-kafka

Reactor kafka how to guarantee at least once after a flatMap


I have a service that consumes from kafka and store the data to database. Simplified the logic as below:

Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
kafkaFlux.flatMap(r -> store(r))// IO operation, store to database
    .subscribe(record -> record.receiverOffset().acknowledge()); // Ack the record

The flatMap makes the flux disordered. Based on the reactor kafka documentation, the acknowledge() could ack a record that hasn't been store to database: https://projectreactor.io/docs/kafka/snapshot/api/reactor/kafka/receiver/ReceiverOffset.html

Acknowledges the ReceiverRecord associated with this offset. The offset will be committed automatically based on the commit configuration parameters ReceiverOptions.commitInterval() and ReceiverOptions.commitBatchSize(). When an offset is acknowledged, it is assumed that all records in this partition up to and including this offset have been processed. All acknowledged offsets are committed if possible when the receiver Flux terminates.

How to guarantee at least once but do not block the stream?


Solution

  • Starting with version 1.3.8, commits can be performed out of order and the framework will defer the commits as needed, until any "gaps" are filled. This removes the need for applications to keep track of offsets and commit them in the right order.

    You can set the maxDeferredCommits in your ReceiverOptions to enable the out-of-order commits feature.