I want to call 2 services "a" and "b" and depending on the response of a, wait or not for b.
Something like this:
public Mono<String> test() {
Mono<String> monoA = Mono.fromCallable(() -> {
try {
log.info("A Started");
Thread.sleep(2000);
log.info("A Ended");
return Math.random() > 0.5 ? "A" : "a";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic());
Mono<String> monoB = Mono.fromCallable(() -> {
try {
log.info("B Started");
Thread.sleep(8000);
log.info("B Ended");
return "B";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic());
long startTime = System.currentTimeMillis();
return monoA.flatMap(a -> {
Mono<String> result;
if(a.equals("A")) {
result = Mono.just(a);
} else {
result = monoB;
}
return result;
}).map(response -> {
long totalTime = System.currentTimeMillis() - startTime;
return "Returning '" + response + "' in: " + totalTime + " ms";
});
}
If monoA = "A" prints:
A Started
A Ended
Returning 'A' in: 2032 ms
If monoA = "a" prints:
A Started
A Ended
B Started
B Ended
Returning 'B' in: 10007 ms
if i use:
Mono.zip(monoA, monoB)
The best and worst case is always 8000ms
I want
if monoA = "A" response in 2000ms
if monoA = "a" response in 8000ms
Is what I want possible or even right to do?
I believe that to achieve what you want, you need to start execution of monoB
unconditionally. Please note .cache()
for monoB
, it prevents re-execution of monoB
.
Full code wrapped in a @ParameterizedTest
-annotated method using StepVerifier
.
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MonoTest {
public static final Logger log = LoggerFactory.getLogger(MonoTest.class);
@ParameterizedTest
@ValueSource(strings = {"a", "A"})
void test(String monoAResult) {
Mono<String> monoA = Mono.fromCallable(() -> {
try {
log.info("A Started");
Thread.sleep(2000);
log.info("A Ended");
return monoAResult;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic());
Mono<String> monoB = Mono.fromCallable(() -> {
try {
log.info("B Started");
Thread.sleep(8000);
log.info("B Ended");
return "B";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic()).cache();
monoB.subscribe(); // start execution of monoB
long startTime = System.currentTimeMillis();
Mono<String> monoResult = monoA.flatMap(it -> "A".equals(it)
? Mono.just(it)
: monoB);
StepVerifier.create(monoResult).assertNext(response -> {
long elapsedTime = System.currentTimeMillis() - startTime;
if ("A".equals(monoAResult)) {
assertTrue(elapsedTime < 2100, "Execution time exceeded 2100 ms for result 'A'");
} else {
assertTrue(elapsedTime < 8100, "Execution time exceeded 8100 ms for result 'B'");
}
})
.verifyComplete();
}
}