I started using Project reactor and one place where I'm struggling little is how do I combine things coming from Mono with Flux. Here's my use case:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
Now what I want to achieve is:
How can I combine response from UserFlux and associate those users with that group, with something like group.addUsers(userfromFlux).
Can someone help with how to combine results coming from userFlux and groupMono. I think I use something like Zip but then it does one to one mapping from source. In my case, I need to do 1 to N mapping. Here I have one group but multiple user that I need to add to that group. Is it a good idea to return Mono<List<Users>
and then use zip operator with both mono and provide a combinator as mentioned here
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)
?
I think Flux.combineLatest
static method could help you there: since your Mono
only ever emits 1 element, that element will always be the one combined with each incoming value from the Flux
.
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);