I've read from the documentation that flatMap
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.
that flatMapSequential
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.
and that concatMap
:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. There are three dimensions to this operator that can be compared with flatMap and flatMapSequential:
Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
Ordering of the flattened values: this operator naturally preserves the same order as the source elements, concatenating the inners from each source element sequentially.
Interleaving: this operator does not let values from different inners interleave (concatenation).
The difference between flatMap
and the other two is pretty understandable, but I don't understand when the difference between concatMap
and flatMapSequential
takes place. Is there any performance difference between the two? I've read that flatMapSequential
has a buffer size for some queue, but I don't understand why concatMap
doesn't need one.
The flatMap
and flatMapSequential
operators subscribe eagerly, the concatMap
waits for each inner completion before generating the next sub-stream and subscribing to it.
Let's see an example:
@Test
void test_flatMap() {
Flux.just(1, 2, 3)
.flatMap(this::doSomethingAsync)
//.flatMapSequential(this::doSomethingAsync)
//.concatMap(this::doSomethingAsync)
.doOnNext(n -> log.info("Done {}", n))
.blockLast();
}
private Mono<Integer> doSomethingAsync(Integer number) {
//add some delay for the second item...
return number == 2 ? Mono.just(number).doOnNext(n -> log.info("Executing {}", n)).delayElement(Duration.ofSeconds(1))
: Mono.just(number).doOnNext(n -> log.info("Executing {}", n));
}
Output:
2022-04-22 19:38:49,164 INFO main - Executing 1
2022-04-22 19:38:49,168 INFO main - Done 1
2022-04-22 19:38:49,198 INFO main - Executing 2
2022-04-22 19:38:49,200 INFO main - Executing 3
2022-04-22 19:38:49,200 INFO main - Done 3
2022-04-22 19:38:50,200 INFO parallel-1 - Done 2
As you can see, flatMap
does not preserve original ordering, and has subscribed to all three elements eagerly. Also, notice that element 3 has proceeded before element 2.
Here is the output using flatMapSequential
:
2022-04-22 19:53:40,229 INFO main - Executing 1
2022-04-22 19:53:40,232 INFO main - Done 1
2022-04-22 19:53:40,261 INFO main - Executing 2
2022-04-22 19:53:40,263 INFO main - Executing 3
2022-04-22 19:53:41,263 INFO parallel-1 - Done 2
2022-04-22 19:53:41,264 INFO parallel-1 - Done 3
flatMapSequential
has subscribed to all three elements eagerly like flatMap
but preserves the order by queuing elements received out of order.
Here is the output using concatMap
:
2022-04-22 19:59:31,817 INFO main - Executing 1
2022-04-22 19:59:31,820 INFO main - Done 1
2022-04-22 19:59:31,853 INFO main - Executing 2
2022-04-22 19:59:32,857 INFO parallel-1 - Done 2
2022-04-22 19:59:32,857 INFO parallel-1 - Executing 3
2022-04-22 19:59:32,857 INFO parallel-1 - Done 3
concatMap
naturally preserves the same order as the source elements.