spring-webfluxproject-reactorreactorreactive-streams

Whats the difference between flatMap, flatMapSequential and concatMap in Project Reactor?


I've read from the documentation that flatMap:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

that flatMapSequential:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

and that concatMap:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:

Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.

Interleaving: this operator does not let values from different inners interleave (concatenation).

The difference between flatMap and the other two is pretty understandable, but I don't understand when the difference between concatMap and flatMapSequential takes place. Is there any performance difference between the two? I've read that flatMapSequential has a buffer size for some queue, but I don't understand why concatMap doesn't need one.


Solution

  • The flatMap and flatMapSequential operators subscribe eagerly, the concatMap waits for each inner completion before generating the next sub-stream and subscribing to it.

    Let's see an example:

      @Test
      void test_flatMap() {
        Flux.just(1, 2, 3)
            .flatMap(this::doSomethingAsync)
            //.flatMapSequential(this::doSomethingAsync)
            //.concatMap(this::doSomethingAsync)
            .doOnNext(n -> log.info("Done {}", n))
            .blockLast();
      }
    
      private Mono<Integer> doSomethingAsync(Integer number) {
        //add some delay for the second item...
        return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))
            : Mono.just(number).doOnNext(n -> log.info("Executing {}", n));
      }
    

    Output:

    2022-04-22 19:38:49,164  INFO main - Executing 1
    2022-04-22 19:38:49,168  INFO main - Done 1
    2022-04-22 19:38:49,198  INFO main - Executing 2
    2022-04-22 19:38:49,200  INFO main - Executing 3
    2022-04-22 19:38:49,200  INFO main - Done 3
    2022-04-22 19:38:50,200  INFO parallel-1 - Done 2
    

    As you can see, flatMap does not preserve original ordering, and has subscribed to all three elements eagerly. Also, notice that element 3 has proceeded before element 2.

    Here is the output using flatMapSequential:

    2022-04-22 19:53:40,229  INFO main - Executing 1
    2022-04-22 19:53:40,232  INFO main - Done 1
    2022-04-22 19:53:40,261  INFO main - Executing 2
    2022-04-22 19:53:40,263  INFO main - Executing 3
    2022-04-22 19:53:41,263  INFO parallel-1 - Done 2
    2022-04-22 19:53:41,264  INFO parallel-1 - Done 3
    

    flatMapSequential has subscribed to all three elements eagerly like flatMap but preserves the order by queuing elements received out of order.

    Here is the output using concatMap:

    2022-04-22 19:59:31,817  INFO main - Executing 1
    2022-04-22 19:59:31,820  INFO main - Done 1
    2022-04-22 19:59:31,853  INFO main - Executing 2
    2022-04-22 19:59:32,857  INFO parallel-1 - Done 2
    2022-04-22 19:59:32,857  INFO parallel-1 - Executing 3
    2022-04-22 19:59:32,857  INFO parallel-1 - Done 3
    

    concatMap naturally preserves the same order as the source elements.