javaapache-kafkaspring-kafkareactor-kafka

Delay the start of message consumption


I have a very simple Reactor Kafka consumer.

However, in order for the message to be handled properly, couple of @Postconstruct should be executed beforehand.

The @Postconstruct will initialize some objects in memory, prepare data, etc...

If the messages start being consumed, but the @Postconstruct is not complete, the message handling will fail for sure.

The @Postconstruct needs some time to complete, roughly 3ish seconds. What I tried: Therefore, currently, in my code, I would just handle the messages, and for the first 3ish seconds, all messages processing will fail, and I would put the failed messages in another topic.

At some point, after the 3ish seconds, the @Postconstruct is complete, the messages are being processed normally, happy.

However, I found this very redundant, I have another topic and another job which existence is just here to re process the messages failed within those 3 seconds.

Is there a simpler way, to tell Reactor Kafka to only start consuming, only once the @Postconstruct is complete (maybe by sending some signal within the app), or maybe even just to wait 3ish seconds?

So far, I tried this:

  public Flux<String> myConsumer() {
        return KafkaReceiver
                .create(receiverOptions)
                .receive()
                .delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
                .flatMap((oneMessage) ->
                        Mono.deferContextual(contextView -> {
                            var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
                            try (scope) {
                                return consume(oneMessage);
                            }
                        }), 500)
                .name("greeting.call")      //1
                .tag("latency", "low")  //2
                .tap(Micrometer.observation(observationRegistry));

However, it is not doing what I expect, it is just delaying.


Solution

  • You should not subscribe() to the flux in a post construct method. Instead, implement SmartLifecycle and subscribe in start().

    Or use an event listener and only subscribe after the context is refreshed.

    That way, all beans will be fully wired up before you start to receive the records.