I'm wondering about paused consumption of my Kafka queue when there would be a problem with client connection.
I want to use Retry and CircuitBreaker on Rest client (which is called after message consumption). So this message will be retried in the loop, during this time I want to pause consuming other messages from this topic.
Is it possible in Quarkus framework? This is how consumer currently look:
@Slf4j
@ApplicationScoped
public class MyKafkaConsumer {
@Inject
MyService myService;
@Incoming(KAFKA_CHANNEL)
@ActivateRequestContext
public CompletionStage<Void> receive(Message<String> message) {
if (!isItExpectedMessage(message) {
message.ack();
}
myService.process(message);
message.ack();
}
I cannot find any solution to pause consumption and restore during processing. Do you know some way or workaround in this case?
Thank you in advance!
You can @Inject KafkaClientService
, then call getConsumer(channel)
on it, and pause()
on the result before calling myService
/ resume()
after calling myService
.
Remember that to use pause()
/ resume()
, you need to disable automatic pause/resume, as described in the documentation:
mp.messaging.incoming.[channel].pause-if-no-requests=false