spring-webfluxproject-reactor

Can doOnNext's side-effect be skipped?


I am currently writing a method which needs to add each element into a concurrentHashMap before continuing downstream. E.g.

public Flux<Bar> methodOne(final Flux<Foo> foos) {
    Map<UUID, Foo> idToFoo = new ConcurrentHashMap<>();
    return foos.doOnNext(foo -> idToFoo.put(foo.getId(), foo)
         .flatMap(foo -> fooToBar(foo, idToFoo));
}

Can I be certain that within fooToBar() that the passed map will be populated with the corresponding element?

From my understanding, doOnNext() is synchronous and it execute and complete the lambda before continuing downstream. However, I am getting pushback from a senior dev - they think as this is a side-effect it is a 'fire and forget' type.

I am working on understanding the source code better so I can justify the use case but wanted to check here as well.

Thanks


Solution

  • From my understanding, doOnNext() is synchronous and it execute and complete the lambda before continuing downstream. However, I am getting pushback from a senior dev - they think as this is a side-effect it is a 'fire and forget' type.

    You are correct. Unless coded otherwise, the consumer inside the doOnNext() will be fully executed before the next operator in the chain is executed.

    From the Reactor API docs:

    The Consumer is executed first, then the onNext signal is propagated downstream.

    You can showcase this behaviour with the following example

        public static void main(String[] args) {
            Mono.just(1)
                    .doOnNext(i -> {
                        try {
                            Thread.sleep(10_000);
                            System.out.println("Waiting for " + i);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(i -> ++i)
                    .subscribe(i -> System.out.println("Completed with value " + i));
        }
    

    When this main method is run, you will always see the following output in the following order, regardless of how high of a delay you set on the Thread.sleep

    Waiting for 1
    Completed with value 2