In my project, I use spring-kafka (v3.1.1) and reactor-kafka (v1.3.22) to consume events from a specific topic. Let's call it "topic-a". The downstream consumer processes each message for about 50ms. When there is a continuous stream of Kafka events produced in the topic (like 1.1 million events in a row), these events get evenly distributed to 5 partitions (there are 5 partitions in "topic-a") with each partition having about 225k records. The max poll records is set to 40 (I set it to 200 before and still encountered the same issue). When the consumer is consuming, I see that the reactor kafka code (in ConsumerEventLoop) pauses all 5 partitions due to backpressure. After backpressure is relieved, it resumes all 5 partitions. While the partitions are paused, the Apache Kafka code skips fetching records so that the unconsumed records in the Kafka partition will not be drained. However, when the Kafka consumer gets paused too many times by the Reactor Kafka code, eventually it seems that not all partitions' records are skipped fetching (according to the log). Eventually 1 out of the 5 partition's events get drained and processed, but the other 4 partitions' offset just becomes the latest without having their records actually getting emitted by the Reactor Kafka code. Even after I restart my application, on partition assignment, the positions for the other 4 partitions are first at a position before the ending offset. And then when Kafka starts fetching, all the positions in the 4 partitions are indicated to be starting at the end offset. This causes the unprocessed messages just not getting drained in those 4 partitions. I was wondering what went wrong and how I can get the other 4 paritions' records drained without the offset just getting advanced.
Below is a simplified version of my KafkaReceiver.receive code:
reactiveKafkaConsumerTemplate
.receive()
.map(receiverRecord -> convertToCustomObject(receiverRecord))
.flatMap(customObjectReceiverRecordTuple -> getDetails(customObjectReceiverRecordTuple._1), 3) // Network call so can be slow
.flatMap(detailedCustomObjectReceiverRecordTuple -> save(detailedCustomObjectReceiverRecordTuple._1), 3) // Network call so can be slow
.doOnNext(receiverRecord -> receiverRecord.receiverOffset().acknowledge())
.map(receiverRecord -> true)
.bufferTimeout(110, Duration.ofSeconds(5))
.map(List::size)
.doOnNext(integer -> log.info("Saved {} detailedCustomObjects", integer)
.onErrorResume(throwable -> {
log.error("Error encountered, resuming...", throwable);
return Mono.empty();
})
.subscribe();
Here is a sample of log before the consumer just stops emitting/processing more events:
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop : Paused - back pressure
MyReactiveConsumerClass : Saved 61 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop : Async committing: {topic-a-1=OffsetAndMetadata{offset=913053, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop : Paused - back pressure
MyReactiveConsumerClass : Saved 55 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Async committing: {topic-a-1=OffsetAndMetadata{offset=913111, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop : Paused - back pressure
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop : Paused - back pressure
MyReactiveConsumerClass : Saved 64 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop : Async committing: {topic-a-1=OffsetAndMetadata{offset=913170, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Emitting 40 records, requested now 0
r.k.r.internals.ConsumerEventLoop : Paused - back pressure
MyReactiveConsumerClass : Saved 58 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop : Async committing: {topic-a-1=OffsetAndMetadata{offset=913226, leaderEpoch=null, metadata=''}}
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Emitting 19 records, requested now 0
r.k.r.internals.ConsumerEventLoop : Paused - back pressure
MyReactiveConsumerClass : Saved 52 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop : onRequest.toAdd 1, paused true
r.k.r.internals.ConsumerEventLoop : Consumer woken
r.k.r.internals.ConsumerEventLoop : Resumed partitions: [topic-a-0, topic-a-1, topic-a-2, topic-a-3, topic-a-4]
r.k.r.internals.ConsumerEventLoop : Async committing: {topic-a-1=OffsetAndMetadata{offset=913280, leaderEpoch=null, metadata=''}}
MyReactiveConsumerClass : Saved 31 detailedCustomObjects
r.k.r.internals.ConsumerEventLoop : Async committing: {topic-a-1=OffsetAndMetadata{offset=913295, leaderEpoch=null, metadata=''}}
Here is a sample log of Apache Kafka code skipping the fetch requests because the partition is paused:
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-3 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-1 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-4 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-2 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-0 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-3 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-1 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-4 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-2 because it is paused
o.a.k.c.c.internals.AbstractFetch : [Consumer clientId=consumer-uuid, groupId=consumerGroup] Skipping fetching records for assigned partition topic-a-0 because it is paused
I found out what was the issue. This GitHub comment explains very well what is the root cause: https://github.com/tulios/kafkajs/issues/1119#issuecomment-862983659. Need to make sure retention byte size is big enough.