I have a very simple Reactor Kafka consumer.
However, in order for the message to be handled properly, couple of @Postconstruct should be executed beforehand.
The @Postconstruct will initialize some objects in memory, prepare data, etc...
If the messages start being consumed, but the @Postconstruct is not complete, the message handling will fail for sure.
The @Postconstruct needs some time to complete, roughly 3ish seconds. What I tried: Therefore, currently, in my code, I would just handle the messages, and for the first 3ish seconds, all messages processing will fail, and I would put the failed messages in another topic.
At some point, after the 3ish seconds, the @Postconstruct is complete, the messages are being processed normally, happy.
However, I found this very redundant, I have another topic and another job which existence is just here to re process the messages failed within those 3 seconds.
Is there a simpler way, to tell Reactor Kafka to only start consuming, only once the @Postconstruct is complete (maybe by sending some signal within the app), or maybe even just to wait 3ish seconds?
So far, I tried this:
public Flux<String> myConsumer() {
return KafkaReceiver
.create(receiverOptions)
.receive()
.delayUntil(a -> Mono.delay(Duration.ofSeconds(4)))
.flatMap((oneMessage) ->
Mono.deferContextual(contextView -> {
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
try (scope) {
return consume(oneMessage);
}
}), 500)
.name("greeting.call") //1
.tag("latency", "low") //2
.tap(Micrometer.observation(observationRegistry));
However, it is not doing what I expect, it is just delaying.
You should not subscribe()
to the flux in a post construct method. Instead, implement SmartLifecycle
and subscribe in start()
.
Or use an event listener and only subscribe after the context is refreshed.
That way, all beans will be fully wired up before you start to receive the records.