I have difficulties in raising this problem because I did not find a use case similar to mine, I am just starting to use WebFLux.
I have a method getIdEntrevista
that returns a Flux with a list of ‘id’.
Method:
public Flux<IdEntrevista> getIdEntrevista(String perfil){
return this.webClient.baseUrl("http://localhost:8081").build()
.get()
.uri("/api/orquestador/v1/entrevistador/public/entrevista_muestra_id?perfil="+perfil)
.retrieve()
.bodyToFlux(IdEntrevista.class);
}
Result:
[
{
"id": "6639711af44f9905dbdcd889"
},
{
"id": "663971fdd44ffdd5dbdcd88b"
},
...(29 elements more)
]
I have another method getPreguntas
, one that returns a Flux from a repository function interviewTestDao.getPreguntas
, this one has as attributes an id and limit, id is the important thing.
public Flux<SoloPreguntaImp> getPreguntas(String perfil, int limit) {
//logica para obtener el idEntrevista
//return this.interviewTestDao.getPreguntas("id",limit);
return interviewTestDao.getPreguntas("6639711af44f9905dbdcd889",3);
}
Result of this.interviewTestDao.getPreguntas:
[
{
"pregunta": "¿Cuál es tu experiencia trabajando con tecnologías de Java y React en proyectos de desarrollo de software?"
},
{
"pregunta": "¿Has liderado el diseño e implementación de aplicaciones de software complejas utilizando JavaScript, ReactJS y Java?"
},
{
"pregunta": "¿Cómo garantizas la calidad del código, la mantenibilidad y escalabilidad de las aplicaciones de software en las que trabajas?"
}
]
This is my case: I want to iterate the flux obtained from the getIdEntrevista
to get the id and use it in the repository method until find a non-empty list. this because some ids don't have related questions and return empty [].
It must respect the non-blocking principle.
I tried alternatives, but they always gave me an error.
public Flux<SoloPreguntaImp> getPreguntas(String perfil, int limit) {
return getIdEntrevista(perfil)
.flatMap(idEntrevista -> interviewTestDao.getPreguntas(idEntrevista.getId(), limit)
.collectList()
.flatMapMany(list -> {
if (list.isEmpty()) {
// Si el flujo está vacío, intenta nuevamente después de un período de tiempo
return Mono.error(new RuntimeException("Lista vacía"));
} else {
// Si el flujo no está vacío, emite la lista de preguntas
return Flux.fromIterable(list);
}
}))
.repeatWhenEmpty(repeat -> repeat.delayElements(Duration.ofSeconds(5)))
Thank you in advance
You can use reactor.core.publisher.Flux#next
for which documentation reads:
Emit only the first item emitted by this
Flux
, into a newMono
. If called on an emptyFlux
, emits an emptyMono
.
I did this small test:
@RestController
public class SO78479713 {
@GetMapping(path = "/78479713")
public Mono<SoloPreguntaImp> getPreguntas(@RequestParam String perfil) {
return getIdEntrevista(perfil)
.flatMap(idEntrevista -> getPreguntasFromEntrevista(idEntrevista.id()))
.next();
}
public Flux<IdEntrevista> getIdEntrevista(String perfil) {
return Flux.just(new IdEntrevista("E_1_%s".formatted(perfil)),
new IdEntrevista("E_2_%s".formatted(perfil)),
new IdEntrevista("E_3_%s".formatted(perfil)),
new IdEntrevista("E_4_%s".formatted(perfil)),
new IdEntrevista("E_5_%s".formatted(perfil)),
new IdEntrevista("E_6_%s".formatted(perfil)),
new IdEntrevista("E_7_%s".formatted(perfil)));
}
public Flux<SoloPreguntaImp> getPreguntasFromEntrevista(String entrevistaId) {
// The first few elements are empty, then we get two valid
// pregunta, then the last one is empty as well.
return switch (entrevistaId) {
case "E_5_test" -> Flux.just(new SoloPreguntaImp("FOUND"));
case "E_6_test" -> Flux.just(new SoloPreguntaImp("SHOULD NOT BE RETURNED"));
default -> Flux.empty();
};
}
record SoloPreguntaImp(String id) {
}
record IdEntrevista(String id) {
}
}
It seems to be working as you expect:
➜ ~ curl --silent -X GET --location "http://localhost:8080/78479713?perfil=test" | jq
{
"id": "FOUND"
}