I'm a little confused.
The problem is on making reactive call chain with retry. It goes to step 3 without save retried value on step 2. If I delete onErrorContinue it will retry save successefuly but I miss "onErrorContinue" functionality.
What I do:
just for example
Flux.fromIterable(s) // actually from kafka
.flatMap(kk -> eventService.save(kk)
.doOnNext(it -> record.receiverOffset().acknowledge()))
.onErrorContinue((ex, record) -> {
log.error("Exception during consumer", ex);
if (record instanceof ReceiverRecord failedRecord &&
ex instanceof DeserializationException exception) {
log.error("Failed record details {} and exception {}", record, exception);
failedRecord.receiverOffset().acknowledge();
} else {
log.error("Some unexpected error type! Cant move offset", ex);
}
})
.doOnError(e -> log.error("Exception during consumer", e))
.retry()
.subscribe();
eventService.save looks like. And it works as excpected (do retry on exception)
public Mono<SomeModel> save(SomeModel someModel) {
return Mono.defer(() -> repository.save(someModel))
.retryWhen(Retry
.backoff(1, Duration.ofMillis(100))
.maxBackoff(Duration.ofMillis(5000)));
}
Actualy exception propagation helped me
public Mono<SomeModel> save(SomeModel someModel) {
return Mono.defer(() -> repository.save(someModel))
.retryWhen(Retry
.backoff(1, Duration.ofMillis(100))
.maxBackoff(Duration.ofMillis(5000)))
.onErrorContinue((t, o) -> {
throw Exceptions.propagate(t);
}));