apache-kafkakafka-consumer-apispring-kafkaproject-reactorreactive-kafka

Multiple Kafka configurations in a reactive Kafka project


I am working on a project that has 2 services: both reading, transforming the messages and then writing to another Kafka. The Kafka configuration for both these services are different. Here is my application.yml

    spring:
  kafka:
    bootstrap-servers: localhost:9092
    sourcetopic1: topic1
    destinationtopic1 : topic2
    sourcetopic2: topic3
    destinationtopic2 : topic4
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: TestCollector
      client-id:TestCollector01
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

These are my config files for both the services:

  1. Service1KafkaConfig

     public class KafkaConfig {
         @Bean
         public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) {
             ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
             return basicReceiverOptions.subscription(Collections.singletonList(topic));
         }  
    
    
     @Bean
         public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
             return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
         }  
    

    }

  2. Service2Config

     public class Service2KafkaConfig {
         @Bean
         public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) {
             ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
             return basicReceiverOptions.subscription(Collections.singletonList(topic));
         }
    
     @Bean
     public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
         return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
     }
    

    }

I autowire these bean in the respective services:

Service1: I haven't added the ProcessRecord method for service1 because I didn't feel that is needed for this issue. Please let me know if needed.

  @Slf4j
@Service
public class Service1 implements CommandLineRunner {
    @Autowired
    public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;


    public Flux<String> consume1() {
        return service1KafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
.donOnNext(s->ProcessRecord(s))
                .map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));

    }

    @Override
    public void run(String... args) throws Exception {
        consume1().subscribe();
    }
}

Service2:

@Slf4j
@Service
public class Service2 implements CommandLineRunner {
    @Autowired
    @Qualifier("service2KafkaConsumerTemplate")
    public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;


    public Flux<String> consume2() {
        return service2KafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));

    }


    @Override
    public void run(String... args) throws Exception {
        consume2().subscribe();
    }
}

When I run the application I can see only one of the consumer starting subscribed to topic1. Is it possible to have multiple Kafka consumers running in the same project. If yes, can you please let me know what needs to be done to have them running?


Solution

  • It looks like they are both using the same topic and configuration; if there is only one partition and they are in the same consumer group, only one of them will get any data.

    If you want them both to get the same data, you must put them in different consumer groups.

    This works as expected:

    @Bean
    public ApplicationRunner runner1() {
        return args -> {
            ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                    Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                            ConsumerConfig.GROUP_ID_CONFIG, "group1",
                            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                    .withKeyDeserializer(new StringDeserializer())
                    .withValueDeserializer(new StringDeserializer())
                    .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                    .commitBatchSize(1)
                    .subscription(Collections.singletonList("INPUT_1"));
            KafkaReceiver.create(ro)
                    .receive()
                    .doOnNext(record -> {
                        System.out.println("one: " + record.value() + "@" + record.offset());
                        record.receiverOffset().acknowledge();
                    })
                    .doOnError(System.out::println)
                    .subscribe();
        };
    }
    
    @Bean
    public ApplicationRunner runner2() {
        return args -> {
            ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                    Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                            ConsumerConfig.GROUP_ID_CONFIG, "group2",
                            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                    .withKeyDeserializer(new StringDeserializer())
                    .withValueDeserializer(new StringDeserializer())
                    .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                    .commitBatchSize(1)
                    .subscription(Collections.singletonList("INPUT_1"));
            KafkaReceiver.create(ro)
                    .receive()
                    .doOnNext(record -> {
                        System.out.println("two: " + record.value() + "@" + record.offset());
                        record.receiverOffset().acknowledge();
                    })
                    .doOnError(System.out::println)
                    .subscribe();
        };
    }
    
    one: foo@16
    two: foo@16