javareactive-programmingproject-reactorcircuit-breakerreactive-kafka

Reactor - Delay Flux elements in case of processing errors


I have a similar problem to this question and I do not see an accepted answer. I have researched through and did not get a satisfactory answer.

I have a reactive Kafka consumer (Spring Reactor) with a poll amount 'x' and the application pushes the messages polled to a reactive endpoint using reactive webclient. The issue here is that the external service can perform differently overtime and I will have to adjust the Kafka consumer to poll less messages when the circuit breaker opens (Or kick in backpressure) when we see lot of failures. Is there a way in the current reactor to automatically

  1. React when the circuit breaker is in open state and reduce the poll amount or slow down the consumption.
  2. Increase the poll amount to the previous state when the circuit is closed ( External service would scaled up if it goes down ).

I do not want to use delayElements or delayUntil since these are mostly static in nature and want the application to react during runtime. How can I configure these end to end backpressure? I would provide the values for consumers when the circuit is closed, partially closed and open in app configs.


Solution

  • As backpressure is based on the slowness of the consumer, one way to achieve this is to convert certain exception types to delay. You can use the onErrorResume for this purpose as demonstrated below:

    long start = System.currentTimeMillis();
    
    Flux.range(1, 1000)
            .doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
            .flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
            .blockLast();
    
    System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");
    
    private Mono<Integer> process(Integer item) {
        // simulate error for some items
        if (item >= 50 && item <= 100) {
            return Mono.error(new RuntimeException("Downstream failed."));
        }
    
        // normal processing
        return Mono.delay(Duration.ofMillis(10))
                .thenReturn(item);
    }
    
    private Mono<Integer> slowDown(Throwable e) {
        if (e instanceof RuntimeException) { // you could check for circuit breaker exception
            return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
        }
    
        return Mono.empty(); // no delay for other errors
    }
    

    If you check the output of this code, you can see there is some slow down between the items 50 and 100 but it works at regular speed before and after.

    Note that my example does not use Kafka. As you are using reactor-kafka library which honors backpressure it is supposed to work the same way as this dummy example.

    Also, as the Flux might process items concurrently, the slow down is not immediate, it will try to process some additional items before properly slowing down.