I have a service that consumes from kafka and store the data to database. Simplified the logic as below:
Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();
kafkaFlux.flatMap(r -> store(r))// IO operation, store to database
.subscribe(record -> record.receiverOffset().acknowledge()); // Ack the record
The flatMap
makes the flux disordered. Based on the reactor kafka documentation, the acknowledge()
could ack a record that hasn't been store to database:
https://projectreactor.io/docs/kafka/snapshot/api/reactor/kafka/receiver/ReceiverOffset.html
Acknowledges the ReceiverRecord associated with this offset. The offset will be committed automatically based on the commit configuration parameters ReceiverOptions.commitInterval() and ReceiverOptions.commitBatchSize(). When an offset is acknowledged, it is assumed that all records in this partition up to and including this offset have been processed. All acknowledged offsets are committed if possible when the receiver Flux terminates.
How to guarantee at least once but do not block the stream?
Starting with version 1.3.8, commits can be performed out of order and the framework will defer the commits as needed, until any "gaps" are filled. This removes the need for applications to keep track of offsets and commit them in the right order.
You can set the maxDeferredCommits
in your ReceiverOptions
to enable the out-of-order commits feature.