spring-webfluxproject-reactor

Reactor skip step without Retry


I'm a little confused.

The problem is on making reactive call chain with retry. It goes to step 3 without save retried value on step 2. If I delete onErrorContinue it will retry save successefuly but I miss "onErrorContinue" functionality.

What I do:

  1. Input: Flux
  2. Call repository save for Model in flatMap with Retry on exception
  3. I need to skip SomeModel if it fails on 2 step. (actually it's reactive kafka listener)

just for example

            Flux.fromIterable(s) // actually from kafka
                    .flatMap(kk -> eventService.save(kk)
                        .doOnNext(it -> record.receiverOffset().acknowledge()))                
            .onErrorContinue((ex, record) -> {
                log.error("Exception during consumer", ex);
                if (record instanceof ReceiverRecord failedRecord &&
                        ex instanceof DeserializationException exception) {
                    log.error("Failed record details {} and exception {}", record, exception);
                    failedRecord.receiverOffset().acknowledge();
                } else {
                    log.error("Some unexpected error type! Cant move offset", ex);
                }
            })
            .doOnError(e -> log.error("Exception during consumer", e))
            .retry()
            .subscribe();

eventService.save looks like. And it works as excpected (do retry on exception)

public Mono<SomeModel> save(SomeModel someModel) {
    return Mono.defer(() -> repository.save(someModel))
            .retryWhen(Retry
                    .backoff(1, Duration.ofMillis(100))
                    .maxBackoff(Duration.ofMillis(5000)));

}

Solution

  • Actualy exception propagation helped me

    public Mono<SomeModel> save(SomeModel someModel) {
        return Mono.defer(() -> repository.save(someModel))
                .retryWhen(Retry
                        .backoff(1, Duration.ofMillis(100))
                        .maxBackoff(Duration.ofMillis(5000)))
                    .onErrorContinue((t, o) -> {
                        throw Exceptions.propagate(t);
                    }));