javaspring-webfluxproject-reactor

Measure elapsed time on Spring reactor


I am using spring-webflux version 3.2.x

I want to log the total elapsed time of an whole pipeline execution.

I have found a method elapsed() but I want the total time, but I want the total time, not for items (doOnNext).

I saw a lot of options but I am not sure what is the correct to use.

My code:

private Mono getData(String slug) {

var stopWatch = StopWatch.createStarted();

return getProducts(
    Mono.zip(
        proxy.getExternalData(slug),
        getProductPersonalization(slug)))
    .doOnSuccess(it -> log.info("Finish with success:"))
    .doOnTerminate(() -> {
      stopWatch.stop();
      log.info("getData execution time: {} ms",
          stopWatch.getTime(TimeUnit.MILLISECONDS));
    });

}

  private Mono<MyResponse> getProducts(Mono<Tuple2<ProductExternalResponse, ProductPersonalization>> tuple);

Note: I read that using stopWatch outside the pipeline is wrong, that is the reason I am looking for another solution.

This method is returned to a Controller


Solution

  • You can measure the time it took a mono to reach termination from subscription by using java System.nanoTime(). Save the current nanotime on subscription (in a thread-safe structure, like an atomic long), and then on termination, substract it from the time at this point.

    Note: however that such solution will also measure the time your mono was suspended, because we cannot detect when Mono operation is paused.

    So, for example, you can craft a method that log elapsed time when reaching termination:

    public static <V> Mono<V> logExecTimeMs(Mono<V> toMeasure, Logger logger) {
        var startTime = new AtomicLong();
        return toMeasure
                // Save current time on execution start
                .doOnSubscribe(s -> startTime.set(System.nanoTime()))
                // On termination, substract finish time from start time to get exec time estimation
                .doOnTerminate(() -> {
                    var execTimeNano =  System.nanoTime() - startTime.get();
                    var execTimeMs = execTimeNano * 1e-6;
                    logger.debug("Execution time (ms): " +execTimeMs);
                });
    }
    

    You can also create one that acts like reactor elapsed function and return measured time in output pipeline:

    /**
     * Measure time it took for input mono to reach termination from subscription (computation start).
     *
     * @param pipelineToMeasure User Mono to time.
     * @return Input Mono, decorated.
     *         The output is a tuple whose first value is execution time in nanoseconds.
     *         The second value is decorated mono result.
     */
    public static <V> Mono<Tuple2<Long, V>> timeItNano(Mono<V> pipelineToMeasure) {
        var timeCache = new AtomicLong();
        return pipelineToMeasure
                .doOnSubscribe(s -> timeCache.set(System.nanoTime()))
                .doOnTerminate(() -> timeCache.updateAndGet(start -> System.nanoTime() - start))
                .map(value -> Tuples.of(timeCache.get(), value));
    }
    

    We can test it with this test case :

    @Test
    public void test() {
        // Create a test mono that takes 50 ms to send next element.
        var exampleMono = Mono.just("foo")
                .delayElement(Duration.ofMillis(50));
    
        // Time it with our example method
        var timed = timeItNano(exampleMono);
    
        StepVerifier.create(timed)
                .assertNext(tuple -> {
                    // Ensure decorated mono result has been preserved
                    assertEquals("foo", tuple.getT2());
                    // Check measured time is approwimately correct (must be more than 50ms, but not too big)
                    var execTimeMs = tuple.getT1() * 1e-6;
                    assertTrue(execTimeMs > 50 && execTimeMs < 100);
                })
                .verifyComplete();
    }