javareactive-programmingreactivereactor-kafka

How to fetch value from Flux<ConsumerRecord<K, V>> and call some method for every record


I'm new to reactive programming and trying to fetch records from Flux if Consumer records and forward that record for further processing.

@Slf4j
public class OMSKafkaConsumer<K, V> {

private final KafkaReceiver<K, V> receiver;

public OMSKafkaConsumer(KafkaConsumerConfig<K, V> consumerConfig) {
    final ReceiverOptions<K, V> receiverOptions = ReceiverOptions.<K, V>create(consumerConfig.consumerConfigs())
            .subscription(Collections.singleton(consumerConfig.getTopic()));
    receiver = KafkaReceiver.create(receiverOptions);
}

public Flux<ConsumerRecord<K, V>> receiveMessage() {
    final Flux<ConsumerRecord<K, V>> inboundFlux = receiver.receiveAtmostOnce();
    inboundFlux.subscribe(r -> log.info("Received message: %s \n", r));
    return inboundFlux;
}

}

This is how I'm trying to access the record received from Kafka in my consumer class but I'm getting an error at fromIterable as it only expects list. Not sure how I can loop for consumer records and process them in a reactive way.

return kafkaConsumer.receiveMessage()
              .map(consumerRecords -> Flux.fromIterable(consumerRecords)
                            .doOnNext(() -> {
                              for (ConsumerRecord<K, V> r : consumerRecords)
                                processMessage(r.getValue());
                            }));
    

Solution

  • This is because consumerRecords is a singular ConsumerRecord, and thus cannot be put into Flux.fromIterable. A Flux is already a collection of items, but with some unknown time between the received items.

    You can solve this by directly calling the processMessage method: .map(consumerRecord -> processMessage(consumerRecord.getValue()), this will transform the Flux into a Flux of the type processMessage returns. If processMessage is itself reactive (it returns a Flux or Mono), you should use .concat (if it should be processed sequentially) or .flatMap (if it should be processed parallel) instead.

    There is also a bug in your example: Flux.subscribe should not be called in the middle of the Flux pipeline. Now whenever kafkaConsumer.receiveMessage() is called the inner .subscribe will start the Flux and process items even though the returned Flux is not yet subscribed to, causing items to be processed twice. You can solve this by replacing it with .doOnNext:

    public Flux<ConsumerRecord<K, V>> receiveMessage() {
        return receiver.receiveAtmostOnce()
                    .doOnNext(r -> log.info("Received message: %s \n", r));
    }