I have a common rest controller:
private final KafkaReceiver<String, Domain> receiver;
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Domain> produceFluxMessages() {
return receiver.receive().map(ConsumerRecord::value)
.timeout(Duration.ofSeconds(2));
}
What I am trying to achieve is to collect messages from Kafka topic for a certain period of time, and then just stop consuming and consider this flux completed. If I remove timeout and open this in a browser, I am getting messages forever, downloading never stops. And with this timeout consuming stops after 2 seconds, but I'm getting an exception:
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'map' (and no fallback has been configured)
Is there a way to successfully complete Flux after timeout?
There's multiple overloads of the timeout()
method - you're using the standard one that throws an exception on timeout.
Instead, just use the overloaded timeout method to provide an empty default publisher to fallback to:
timeout(Duration.ofSeconds(2), Mono.empty())
(Note in a general case you could explicitly capture the TimeoutException
and fallback to an empty publisher using onErrorResume(TimeoutException.class, e -> Mono.empty())
, but that's much less preferable to using the above option where possible.)