javaspring-bootspring-webfluxproject-reactorreactive

How to calculate total time process took in Java reactive


I have a Java reactive spring boot app , where I am doing below logic, I want to log how much time does it takes to complete this whole method. I can change the method signature , but would like to keep as is.

public Flux<void> verifyStuff(){
  return client.getSomeData1(query)
        .doOnNext(
            result1 -> log.info("Retrieved '{}' ", result1.size()))
        .flatMapMany(result1 -> Flux.fromStream(result1.stream())
            .concatMap(this::verifyresult1()));
 }

So the method actaully reactively gets a list of results and calls another service sequentially.

I tried using elapsed() but cant get my head warpped around as to how it works. Also tried below version.

public Flux<void> verifyStuff(){
  StopWatch stopWatch = new StopWatch();
    stopWatch.start();
  return client.getSomeData1(query)
        .doOnNext(
            result1 -> log.info("Retrieved '{}' ", result1.size()))
        .flatMapMany(result1 -> Flux.fromStream(result1.stream())
            .concatMap(this::verifyresult1())
        .doFinally(signalType -> {
          stopWatch.stop();
          log.info("To execution time for verifying marketplace partners is {}ms",
              stopWatch.getTotalTimeMillis());
        });;
 }

But this completes the log before all the flux is resolved under "verifyresult1"


Solution

  • You're right with the StopWatch approach, but you're encountering a timing issue because the "doFinally" in your current setup is executing after the "verifyStuff" method has finished, but before all the asynchronous operations in the Flux have completed. This is because Flux operations, such as flatMapMany, are asynchronous and non-blocking. As a result, doFinally can be triggered while the Flux is still processing items.

    To properly measure the execution time of the entire method, you need to ensure that the StopWatch starts when the asynchronous flow begins and stops when the entire Flux pipeline completes.

    Try using elapsed() operator Leverage the elapsed() operator in Flux to measure the time it takes for the entire Flux sequence to complete. The elapsed() operator provides a Tuple2<Long, T> where the first value is the elapsed time in milliseconds, and the second is the actual emitted item.

    Try this if it works:

    public Flux<Void> verifyStuff() {
        // Record the start time using elapsed
        return client.getSomeData1(query)
            .doOnNext(result1 -> log.info("Retrieved '{}' ", result1.size()))
            .flatMapMany(result1 -> Flux.fromStream(result1.stream())
                .concatMap(this::verifyresult1))
            .elapsed()  // This will emit a tuple of elapsed time and the last emitted item
            .doOnTerminate(() -> log.info("Total execution time: {}ms", elapsed().getT1()));
    }