spring-webfluxproject-reactorreactive-kafka

Stop consuming from KafkaReceiver after a timeout


I have a common rest controller:

    private final KafkaReceiver<String, Domain> receiver;

    @GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Domain> produceFluxMessages() {
    return receiver.receive().map(ConsumerRecord::value)
          .timeout(Duration.ofSeconds(2));
    }

What I am trying to achieve is to collect messages from Kafka topic for a certain period of time, and then just stop consuming and consider this flux completed. If I remove timeout and open this in a browser, I am getting messages forever, downloading never stops. And with this timeout consuming stops after 2 seconds, but I'm getting an exception:

java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'map' (and no fallback has been configured)

Is there a way to successfully complete Flux after timeout?


Solution

  • There's multiple overloads of the timeout() method - you're using the standard one that throws an exception on timeout.

    Instead, just use the overloaded timeout method to provide an empty default publisher to fallback to:

    timeout(Duration.ofSeconds(2), Mono.empty())

    (Note in a general case you could explicitly capture the TimeoutException and fallback to an empty publisher using onErrorResume(TimeoutException.class, e -> Mono.empty()), but that's much less preferable to using the above option where possible.)