javaspring-bootspring-webfluxreactive-programming

Is possible to do this in Webflux? Call 2 services in parallel and do not wait for the second if a condition is met with the first


I want to call 2 services "a" and "b" and depending on the response of a, wait or not for b.

Something like this:

    public Mono<String> test() {

        Mono<String> monoA = Mono.fromCallable(() -> {
            try {
                log.info("A Started");
                Thread.sleep(2000);
                log.info("A Ended");
                return Math.random() > 0.5 ? "A" : "a";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).subscribeOn(Schedulers.boundedElastic());

        Mono<String> monoB = Mono.fromCallable(() -> {
            try {
                log.info("B Started");
                Thread.sleep(8000);
                log.info("B Ended");
                return "B";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).subscribeOn(Schedulers.boundedElastic());

        long startTime = System.currentTimeMillis();

        return monoA.flatMap(a -> {
            Mono<String> result;
            if(a.equals("A")) {
                result = Mono.just(a);
            } else {
                result = monoB;
            }
            return result;
        }).map(response -> {
            long totalTime = System.currentTimeMillis() - startTime;
            return "Returning '" + response + "' in: " + totalTime + " ms";
        });

    }

If monoA = "A" prints:

A Started  
A Ended  
Returning 'A' in: 2032 ms

If monoA = "a" prints:

A Started  
A Ended  
B Started  
B Ended  
Returning 'B' in: 10007 ms

if i use:
Mono.zip(monoA, monoB)
The best and worst case is always 8000ms

I want

if monoA = "A" response in 2000ms  
if monoA = "a" response in 8000ms

Is what I want possible or even right to do?


Solution

  • I believe that to achieve what you want, you need to start execution of monoB unconditionally. Please note .cache() for monoB, it prevents re-execution of monoB.

    Full code wrapped in a @ParameterizedTest-annotated method using StepVerifier.

    import org.junit.jupiter.params.ParameterizedTest;
    import org.junit.jupiter.params.provider.ValueSource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    import reactor.test.StepVerifier;
    
    import static org.junit.jupiter.api.Assertions.assertTrue;
    
    public class MonoTest {
        public static final Logger log = LoggerFactory.getLogger(MonoTest.class);
    
        @ParameterizedTest
        @ValueSource(strings = {"a", "A"})
        void test(String monoAResult) {
            Mono<String> monoA = Mono.fromCallable(() -> {
                try {
                    log.info("A Started");
                    Thread.sleep(2000);
                    log.info("A Ended");
                    return monoAResult;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).subscribeOn(Schedulers.boundedElastic());
    
            Mono<String> monoB = Mono.fromCallable(() -> {
                try {
                    log.info("B Started");
                    Thread.sleep(8000);
                    log.info("B Ended");
                    return "B";
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).subscribeOn(Schedulers.boundedElastic()).cache();
    
            monoB.subscribe(); // start execution of monoB
    
            long startTime = System.currentTimeMillis();
    
            Mono<String> monoResult = monoA.flatMap(it -> "A".equals(it)
                    ? Mono.just(it)
                    : monoB);
    
            StepVerifier.create(monoResult).assertNext(response -> {
                        long elapsedTime = System.currentTimeMillis() - startTime;
    
                        if ("A".equals(monoAResult)) {
                            assertTrue(elapsedTime < 2100, "Execution time exceeded 2100 ms for result 'A'");
                        } else {
                            assertTrue(elapsedTime < 8100, "Execution time exceeded 8100 ms for result 'B'");
                        }
                    })
                    .verifyComplete();
        }
    }