javaspring-bootapache-kafkaparallel-processingreactor-kafka

Reactor Kafka: message consumption always on one thread no matter the number of CPU from machine


Small question regarding Reactor Kafka please.

I am having a very straightforward Reactor Kafka project.

package com.example.micrometer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).run(args);
    }

    @Override
    public void run(String... args) {
    }

    @Bean
    Consumer<Flux<Message<String>>> consume() {
        return flux -> flux.flatMap(one -> myHandle(one) ).subscribe();
    }

    private Mono<String> myHandle(Message<String> one) {
        log.info("<==== look at this thread" + "\u001B[32m" + one.getPayload() + "\u001B[0m");
        String payload = one.getPayload();
        String decryptedPayload = complexInMemoryDecryption(payload); //this is NON blocking, takes 1 second
        String complexMatrix = convertDecryptedPayloadToGiantMatrix(decryptedPayload);  //this is NON blocking, takes 1 second
        String newMatrix = matrixComputation(complexMatrix); //this is NON blocking, takes 1 second
        return myNonBlockingReactiveRepository.save(complexMatrix);
    }

}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>streamreactiveconsumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.2</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2022.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

(Note, it is not a Spring Kafka project, not a Spring Cloud Stream project)

I am consuming from a topic with 3 partitions. The rate of the messages sent is one message per second.

The consumption and the processing of the message takes 3ish seconds second per message.

Important: please note the processing does not contain any blocking operation. It is a giant in memory decryption + giant matrix computation. It is BlockHound tested NON blocking.

Actual: When I consume the messages with project Reactor Kafka, the whole consumption happens on one thread only. Everything happens on container-0-C-1

Everything will happen on container-0-C-1, tested with hardware with 2 CPUs, 4 CPUs, 8 CPUs

2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :

Expected: We migrated from http webflux based to Kafka consumption based. The business logic did not change one bit.

On the Reactor Netty Spring webflux application, we could see processing happening from multiple thread, corresponding to the reactor cores. On a machine with many cores, this could keep up easily.

[or-http-epoll-1] [or-http-epoll-2] [or-http-epoll-3] [or-http-epoll-4]

The processing with just switch between any of those reactor-http-epoll-N. I could see when reactor-http-epoll-1 is handling the complex in memory computation for the first message, reactor-http-epoll-3 would handle the computation for the second message, etc... The parallelism is clear

I understand there are way to "scale" this application, but this a question in terms of Reactor Kafka itself.

I expect the messages would be handled in parallel. Some kind of container-0-C-1 for the first message, container-0-C-2 for the second message, etc...

How can I achieve that please? What am I missing?

Thank you


Solution

  • Typically in kafka consumers it's a good idea to separate polling cycle from processing logic. There is also I/O thread that is native to the KafkaConsumer. Sometimes this architecture is called "consumer with pipelining". In this architecture polling thread are continuously fetching records from kafka and then "feed" them to some bounded buffer/queue (i.e. ArrayBlockingQueue or LinkedBlockingQueue). On the other side processing threads take records from the queue and process them. It allows to separate decouple polling logic from the processing implementing buffering and backpreasure.

    Reactor Kafka is built on top of KafkaConsumer API and use similar architecture implementing reactive streams with backpreasure. KafkaReceiver provides polling cycle and by default, publishes fetched records on a Schedulers.single thread.

    Now, depends on your logic, you could process data and commit offsets sequentially or in parallel. For concurrent processing use flatMap that by default processes 256 records in parallel and could be controlled using concurrency parameter.

    kafkaReceiver.receive()
        .flatMap(rec -> proces(rec), concurrency)
    

    If you add logging, you would see that all records are received on kafka-receiver-2 but processed on different parallel-# threads. Note, that records are received in order per partition.

    12:50:08.347  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-2, partition: 0
    12:50:08.349  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-3, partition: 0
    12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-4, partition: 0
    12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-6, partition: 0
    12:50:08.351  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-9, partition: 0
    12:50:08.353  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-0, partition: 2
    12:50:08.354  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-8, partition: 2
    12:50:08.355  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-1, partition: 1
    12:50:08.356  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-5, partition: 1
    12:50:08.358  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-7, partition: 1
    12:50:09.353  [parallel-3] INFO [c.e.d.KafkaConsumerTest] - process: value-2, partition: 0
    12:50:09.353  [parallel-6] INFO [c.e.d.KafkaConsumerTest] - process: value-6, partition: 0
    12:50:09.353  [parallel-4] INFO [c.e.d.KafkaConsumerTest] - process: value-3, partition: 0
    12:50:09.353  [parallel-5] INFO [c.e.d.KafkaConsumerTest] - process: value-4, partition: 0
    12:50:09.355  [parallel-7] INFO [c.e.d.KafkaConsumerTest] - process: value-9, partition: 0
    12:50:09.360  [parallel-10] INFO [c.e.d.KafkaConsumerTest] - process: value-1, partition: 1
    12:50:09.360  [parallel-9] INFO [c.e.d.KafkaConsumerTest] - process: value-8, partition: 2
    12:50:09.360  [parallel-8] INFO [c.e.d.KafkaConsumerTest] - process: value-0, partition: 2
    12:50:09.361  [parallel-11] INFO [c.e.d.KafkaConsumerTest] - process: value-5, partition: 1
    12:50:09.361  [parallel-12] INFO [c.e.d.KafkaConsumerTest] - process: value-7, partition: 1
    

    In other words this is by design and you should not worry about polling logic. You can scale processing by increasing parallelism for flatMap.