I need to make sure all operations on all user items have been completed before doing something with the user record. Does reduce
wait for all elements from the flatMap before proceeding? If not, how can I do that?
Flux.fromIterable(userIds)
// each element has multiple userIds
.flatMap(
userId -> getUserItems(userId)
.map(userItemIds -> doSomethingWithUserItemRecord(userId,userItemIds)))
.reduce(new Result(0, true), this::mergeResult)
// batch operations on all userIds in an element from the original source
.map(result-> doSomethingWithUsersRecords(result))
As per discussion in comments, reduce()
processes all the elements before completing. It doesn't wait though, but starts processing the elements as soon as the first one has emitted and produces the output (i.e., completes) when the source has completed (i.e., no more items are expected to come).
If a reducer is just merging all elements into a list, then collectList()
do that. In this case it's also possible to wait until all the source items are produced, before processing them on the next step. I.e., you get the final list of items only and only if there were no errors in the source publisher.
If the intermediate results are not needed, and you just want to wait until the input is fully produced without any error, then a simple then()
would be better.
All of the mentioned above wait for all the items in the input, before going to the next operation. So it all depends on what you want to do with the original items.