I'm quite new into the reactive world and using Spring Webflux + reactor Kafka.
kafkaReceiver
.receive()
// .publishOn(Schedulers.boundedElastic())
.doOnNext(a -> log.info("Reading message: {}", a.value()))
.concatMap(kafkaRecord ->
//perform DB operation
//kafkaRecord.receiverOffset.ackwnowledge
)
.doOnError(e -> log.error("Error", e))
.retry()
.subscribe();
I understood that in order to parallelise message consumption, I have to instantiate one KafkaReceiver for each partition but is it possible/recommended for a partition to read messages in a synchronous manner and process them async (including the manual acknowledge)?
So that this is the desired output:
Reading message:1
Reading message:2
Reading message:3
Reading message:4
Stored message 1 in DB + ack
Reading message:5
Stored message 2 in DB + ack
Stored message 5 in DB + ack
Stored message 3 in DB + ack
Stored message 4 in DB + ack
In case of errors, I'm thinking of publishing the record to a DLT.
I've tried with flatMap too, but it seems that the entire processing happens sequentially on a single thread. Also if I'm publishing to a new scheduler, the processing happens on a new single Thread. If what I'm asking is possible, can someone please help me with a code snippet?
Reactor Kafka is built on top of KafkaConsumer
API and polling cycle is separated from processing logic with backpreasure and pause consumer if required. By default, KafkaReceiver
publishes fetched records on a Schedulers.single
thread.
Now, depends on your logic, you could process data and commit offsets sequentially or in parallel. For concurrent processing you can use flatMap
that by default, processes Queues.SMALL_BUFFER_SIZE = 256
messages concurrently.
You could control concurrency flatMap(item -> process(item), concurrency)
or use concatMap
operator if you want to process sequentially. Check flatMap(..., int concurrency, int prefetch) for details.
kafkaReceiver.receive()
.flatMap(rec -> process(rec), concurrency)
Depending on use case there are several options.
Ordered message processing
In case message order is important and messages should be processed in the same sequence as they are sent by producer. Kafka guarantees message order per partition.
In Reactior Kafka you can do it by grouping data per partition and then process then sequentially
kafkaReceiver.receive()
.groupBy(message -> message.receiverOffset().topicPartition())
.flatMap(partitions -> partitions.concatMap(this::process));
Unordered message processing
In case sequence is not imporant and messages could be processed in any order we could increase throughput by processing multiple messages in parallel.
kafkaReceiver.receive()
.flatMap(message -> process(message), concurrency);
Reactor kafka supports out of order commits and the framework will defer the commits as needed, until any "gaps" are filled. This removes the need for applications to keep track of offsets and commit them in the right order.
In general, unordered message processing will support much higher throughput on a small number of partitions. For ordered message processing you would need to increase number of partitions to increase throughput. You can find some additional consideration here how to increase message throughput
Let's consider example where processing logic is making remote http call using reactive WebClient
.
private Mono<Void> process(ReceiverRecord<String, String> rec) {
return webClient.post()
.uri("https://postman-echo.com/post")
.retrieve()
.toBodilessEntity()
.doOnNext(res -> log.info("processed: {}, partition: {}", rec.value(), rec.partition()))
.then();
}
If you add logging, you would see that all records are received on kafka-receiver-2
but processed on different reactor-http-nio-#
threads. Note, that records are received in order per partition and respect concurrency. Here is a log with concurrency = 5
.
20:46:31.003 [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-2, partition: 0
20:46:31.572 [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-7, partition: 0
20:46:31.574 [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-8, partition: 0
20:46:31.576 [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-0, partition: 2
20:46:31.579 [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-1, partition: 2
20:46:31.853 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - processed: value-8, partition: 0
20:46:31.853 [reactor-http-nio-6] INFO [c.e.d.KafkaConsumerTest] - processed: value-1, partition: 2
20:46:31.853 [reactor-http-nio-2] INFO [c.e.d.KafkaConsumerTest] - processed: value-2, partition: 0
20:46:31.853 [reactor-http-nio-3] INFO [c.e.d.KafkaConsumerTest] - processed: value-7, partition: 0
20:46:31.853 [reactor-http-nio-5] INFO [c.e.d.KafkaConsumerTest] - processed: value-0, partition: 2
20:46:31.854 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - receive: value-4, partition: 2
20:46:31.857 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - receive: value-5, partition: 2
20:46:31.859 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - receive: value-9, partition: 2
20:46:31.862 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - receive: value-3, partition: 1
20:46:31.864 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - receive: value-6, partition: 1
20:46:31.908 [reactor-http-nio-2] INFO [c.e.d.KafkaConsumerTest] - processed: value-9, partition: 2
20:46:31.908 [reactor-http-nio-3] INFO [c.e.d.KafkaConsumerTest] - processed: value-5, partition: 2
20:46:31.913 [reactor-http-nio-5] INFO [c.e.d.KafkaConsumerTest] - processed: value-3, partition: 1
20:46:31.913 [reactor-http-nio-6] INFO [c.e.d.KafkaConsumerTest] - processed: value-6, partition: 1
20:46:31.937 [reactor-http-nio-4] INFO [c.e.d.KafkaConsumerTest] - processed: value-4, partition: 2
Different reactive clients internally could use different thread pools. For example, for reactive redis (lettuce) it should be lettuce-nioEventLoop-#
, for reactive mongo - nioEventLoopGroup-#
.
It's important to note (@PatPanda thanks for correction) that if you don't have any async logic (delay, http calls or any other I/O bound reactive code), the subscription and execution will continue on the same thread.
To switch to another Scheduler
we need to add .subscribeOn
(e.g. .subscribeOn(Schedulers.parallel())
or .subscribeOn(Schedulers.boundedElastic())) to the
process` function.
kafkaReceiver.receive()
.flatMap(rec -> process(rec).subscribeOn(Schedulers.parallel()))
or configure the Supplier for a Scheduler
on which records will be published
ReceiverOptions<K, V> subscription = ReceiverOptions.<K, V>create(consumerProperties)
.schedulerSupplier(() -> Schedulers.newParallel("kafka-receiver", 3, false))
.subscription(topics);
var receiver = KafkaReceiver.create(subscription);