I have a requirement where I will consume all message from a topic from the beginning to the latest message offset in the partition then stop the consumer from listening to the topic. I have created the following code snippet:
ReceiverOptions<String, byte[]> receiverOptions = this.receiverOptions.subscription(Collections.singleton( topicName ) )
.addAssignListener( partitions -> {
log.debug( "starting to {} offset", startOffset );
partitions.forEach( element -> element.seekToBeginning()
)
.addRevokeListener( partitions -> log.debug( "partition revoked = {}", partitions ) );
Flux<ReceiverRecord<String, byte[]>> fluxKafka = KafkaReceiver.create( receiverOptions ).receive();
disposable = fluxKafka.publishOn( Schedulers.fromExecutor( scheduledExecutor ) )
.subscribe( record -> {
log.debug( "received record [offset = {}, message = {}, timestamp = {}]", record.offset(),
new String( record.value() ), record.timestamp() );
T message = avroDeserializer.deserialize( record.value() );
Optional.ofNullable( onMessageHandler )
.ifPresent( handler -> handler.accept( message ) );
record.receiverOffset().acknowledge();
} );
I don't know what approach should I do.. I'm thinking of getting the current partition position() but I'm not sure if I should proceed with it..
So I finally figured it out, I'm not familiar with reactor so it took me a while, but the code below works as expected:
public Disposable consumeMessages(TopicPartition topicPartition) {
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topicPartition.topic()))
.addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));
KafkaReceiver<Integer, String> kafkaReceiver = KafkaReceiver.create(options);
return kafkaReceiver
.receive()
.flatMap(record -> kafkaReceiver.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition))).cache()
.map(topicPartitionToLastOffset -> Tuples.of(record, topicPartitionToLastOffset.get(topicPartition))))
.takeUntil(recordAndLastOffset -> recordAndLastOffset.getT1().offset() >= (recordAndLastOffset.getT2()-1))
.subscribe(record -> {
ReceiverOffset offset = record.getT1().receiverOffset();
System.out.printf("Received message: topic-partition=%s offset=%d key=%d value=%s\n",
offset.topicPartition(),
offset.offset(),
record.getT1().key(),
record.getT1().value());
offset.acknowledge();
});
}