javaapache-kafkaquarkus

Pause and resume Quarkus microprofile messaging queue


I'm wondering about paused consumption of my Kafka queue when there would be a problem with client connection.

I want to use Retry and CircuitBreaker on Rest client (which is called after message consumption). So this message will be retried in the loop, during this time I want to pause consuming other messages from this topic.

Is it possible in Quarkus framework? This is how consumer currently look:

@Slf4j
@ApplicationScoped
public class MyKafkaConsumer {


    @Inject
    MyService myService;


    @Incoming(KAFKA_CHANNEL)
    @ActivateRequestContext
    public CompletionStage<Void> receive(Message<String> message) {
        if (!isItExpectedMessage(message) {
           message.ack();
        }
        myService.process(message);
        message.ack();
    }

I cannot find any solution to pause consumption and restore during processing. Do you know some way or workaround in this case?

Thank you in advance!


Solution

  • You can @Inject KafkaClientService, then call getConsumer(channel) on it, and pause() on the result before calling myService / resume() after calling myService.

    Remember that to use pause() / resume(), you need to disable automatic pause/resume, as described in the documentation:

    mp.messaging.incoming.[channel].pause-if-no-requests=false