I have a similar problem to this question and I do not see an accepted answer. I have researched through and did not get a satisfactory answer.
I have a reactive Kafka consumer (Spring Reactor) with a poll amount 'x' and the application pushes the messages polled to a reactive endpoint using reactive webclient. The issue here is that the external service can perform differently overtime and I will have to adjust the Kafka consumer to poll less messages when the circuit breaker opens (Or kick in backpressure) when we see lot of failures. Is there a way in the current reactor to automatically
I do not want to use delayElements
or delayUntil
since these are mostly static in nature and want the application to react during runtime. How can I configure these end to end backpressure? I would provide the values for consumers when the circuit is closed, partially closed and open in app configs.
As backpressure is based on the slowness of the consumer, one way to achieve this is to convert certain exception types to delay. You can use the onErrorResume
for this purpose as demonstrated below:
long start = System.currentTimeMillis();
Flux.range(1, 1000)
.doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
.flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
.blockLast();
System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");
private Mono<Integer> process(Integer item) {
// simulate error for some items
if (item >= 50 && item <= 100) {
return Mono.error(new RuntimeException("Downstream failed."));
}
// normal processing
return Mono.delay(Duration.ofMillis(10))
.thenReturn(item);
}
private Mono<Integer> slowDown(Throwable e) {
if (e instanceof RuntimeException) { // you could check for circuit breaker exception
return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
}
return Mono.empty(); // no delay for other errors
}
If you check the output of this code, you can see there is some slow down between the items 50 and 100 but it works at regular speed before and after.
Note that my example does not use Kafka. As you are using reactor-kafka library which honors backpressure it is supposed to work the same way as this dummy example.
Also, as the Flux might process items concurrently, the slow down is not immediate, it will try to process some additional items before properly slowing down.