javaspring-bootapache-kafkareactor-kafka

Stop Kafka Consumer after consuming the log end offset using reactor kafka


I have a requirement where I will consume all message from a topic from the beginning to the latest message offset in the partition then stop the consumer from listening to the topic. I have created the following code snippet:

ReceiverOptions<String, byte[]> receiverOptions = this.receiverOptions.subscription(Collections.singleton( topicName ) )
          .addAssignListener( partitions -> {
              log.debug( "starting to {} offset", startOffset );
              partitions.forEach( element -> element.seekToBeginning() 
          )
          .addRevokeListener( partitions -> log.debug( "partition revoked = {}", partitions ) );

Flux<ReceiverRecord<String, byte[]>> fluxKafka = KafkaReceiver.create( receiverOptions ).receive();

disposable = fluxKafka.publishOn( Schedulers.fromExecutor( scheduledExecutor ) )
          .subscribe( record -> {

              log.debug( "received record [offset = {}, message = {}, timestamp = {}]", record.offset(),
                            new String( record.value() ), record.timestamp() );

              T message = avroDeserializer.deserialize( record.value() );

              Optional.ofNullable( onMessageHandler )
                            .ifPresent( handler -> handler.accept( message ) );

              record.receiverOffset().acknowledge();
          } );

I don't know what approach should I do.. I'm thinking of getting the current partition position() but I'm not sure if I should proceed with it..


Solution

  • So I finally figured it out, I'm not familiar with reactor so it took me a while, but the code below works as expected:

    public Disposable consumeMessages(TopicPartition topicPartition) {
            ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topicPartition.topic()))
                    .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));
            KafkaReceiver<Integer, String> kafkaReceiver = KafkaReceiver.create(options);
            return kafkaReceiver
                    .receive()
                    .flatMap(record -> kafkaReceiver.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition))).cache()
                            .map(topicPartitionToLastOffset -> Tuples.of(record, topicPartitionToLastOffset.get(topicPartition))))
                    .takeUntil(recordAndLastOffset -> recordAndLastOffset.getT1().offset() >= (recordAndLastOffset.getT2()-1))
                    .subscribe(record -> {
                        ReceiverOffset offset = record.getT1().receiverOffset();
                        System.out.printf("Received message: topic-partition=%s offset=%d key=%d value=%s\n",
                                offset.topicPartition(),
                                offset.offset(),
                                record.getT1().key(),
                                record.getT1().value());
                        offset.acknowledge();
                    });
        }