I am giving Spring Cloud Kafka Reactive and Reactive Mongo a try. I am not experienced on Reactor. I am ending up with nested Flux<Mono<>> situation as I describe below
Here are methods and snippets:
Kafka messages are bounded to a function method by Spring Cloud Streams. Input and output are Flux
public Function<Flux<Employee>, Flux<Message<Employee>> enrich() {
return flux-> ...
}
I have classic Spring MongoDB repository classes and query methods
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String>{
Mono<Employee> findOneById(String id);
}
I need to extract the id from incoming Kafka message from the Flux and need to call the MongoDB repository methods. After extracting id, I always end up with nested Mono from db calls. Tried multiple ways. I couldn't go beyond ending up Mono or Flux in Flux if I use map or zipwith.
public Function<Flux<Employee>, Flux<Message<Employee>> enrich(){
return flux-> flux.map(msg -> MessageBuilder.withPayload(findByOneId(msg.getId())).build()) ....
}
Is there a way to use zipWith or another way/method that at the same time extract Id from the Flux and passing Id to the db calls so that I avoid getting Mono in a Flux?
flatMap allows concurrent processing of multiple elements in the Flux , using flatMap prevents nested Flux or Mono structures.
public Function<Flux<Employee>, Flux<Message<Employee>>> enrich(EmployeeRepository employeeRepository) {
return flux -> flux.flatMap(employee ->
employeeRepository.findOneById(employee.getId())
.map(enrichedEmployee -> MessageBuilder.withPayload(enrichedEmployee).build())
);
}