javaerror-handlingspring-webfluxblocking

Spring WebFlux return response when error occurs in Mono.zip but keep processing them


I have a REST service (let's call it 'MAIN SERVICE') that calls other remote services. For performance purpose I need MAIN SERVICE to instant answer when one of the remote calls throw an error ; but I need to keep listening to other calls response to log them or compute verifications.

Basically I have something like, a 'MAIN SERVICE':

public ResponseEntity<SomeType> mainService() {
 Mono<A> remoteCallA = getRemoteCallA();
 Mono<B> remoteCallB = getRemoteCallB();

 SomeType result = Mono.zip(remoteCallA , remoteCallB)
  .doOnSuccess(...)
  .map(...)
  .block();

 return ResponseEntity.ok(response);
}

An other service that calls remote A and do some things with result :

//TAKES 1 SECONDE TO RESPOND
public Mono<A> getRemoteCallA() {
 return client.get()
  ...
  .doOnSuccess(response ->
    //MANDATORY EXECUTION);
}

An other service that calls remote B that throw an error :

//ERROR TRHOWED IN 500 MS
public Mono<B> getRemoteCallB() {
 return client.get()
  ...
  .doOnError(exception ->
    //SOME LOGGGING
  );
}

My problem is that call B fails before call A and I want to instant return an HTTP response. But then, when call A completes, doOnSuccess is not triggered (any doOnXXX method aswell).

I am not sure but I think the zip stream is canceled (saw it with log()) so A events are not triggered ?

zipDelayError is not a solution otherwise I have to wait 1 seconde before responding the error


Solution

  • What seems to be working:

      @Test
      void so_78147735() throws InterruptedException {
    
        var monoA = Mono.fromSupplier(() -> delay(1000L, false, "A")).subscribeOn(Schedulers.boundedElastic());
        var monoB = Mono.fromSupplier(() -> delay(500L, true, "B")).subscribeOn(Schedulers.boundedElastic());
    
        var cachedA = monoA.cache();
        var cachedB = monoB.cache();
    
        var response = Mono.zip(cachedA, cachedB).subscribeOn(Schedulers.boundedElastic());
        var local = Mono.zip(cachedA.map(resultOfA -> resultOfA.replace("A", "SUCCESS")),
                             cachedB.onErrorReturn("FAIL"))
                        .subscribeOn(Schedulers.boundedElastic());
    
        response.subscribe(objects -> System.out.println("response mono: " + objects.getT1() + " " + objects.getT2()));
        local.subscribe(objects -> System.out.println("local mono: " + objects.getT1() + " " + objects.getT2()));
    
        Thread.sleep(1000L);
    
      }
    
      private String delay(Long delay, Boolean error, String id) {
        try {
          Thread.sleep(delay);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
    
        if (error) {
          throw new IllegalArgumentException(id);
        }
    
        System.out.println("delay method: " + id);
    
        return id;
      }