javaspringspring-webfluxflux

Iterate the contents of a flux in a method until a non-empty list is found. Spring WebFLux


I have difficulties in raising this problem because I did not find a use case similar to mine, I am just starting to use WebFLux. I have a method getIdEntrevista that returns a Flux with a list of ‘id’. Method:

public Flux<IdEntrevista> getIdEntrevista(String perfil){
        return this.webClient.baseUrl("http://localhost:8081").build()
                .get()
                .uri("/api/orquestador/v1/entrevistador/public/entrevista_muestra_id?perfil="+perfil)
                .retrieve()
                .bodyToFlux(IdEntrevista.class);
    }

Result:

[
{
  "id": "6639711af44f9905dbdcd889"
},
{
  "id": "663971fdd44ffdd5dbdcd88b"
},
...(29 elements more)
]

I have another method getPreguntas, one that returns a Flux from a repository function interviewTestDao.getPreguntas, this one has as attributes an id and limit, id is the important thing.

    public Flux<SoloPreguntaImp> getPreguntas(String perfil, int limit) {
        //logica para obtener el idEntrevista
        //return this.interviewTestDao.getPreguntas("id",limit);
        return interviewTestDao.getPreguntas("6639711af44f9905dbdcd889",3);
    }

Result of this.interviewTestDao.getPreguntas:

[
  {
    "pregunta": "¿Cuál es tu experiencia trabajando con tecnologías de Java y React en proyectos de desarrollo de software?"
  },
  {
    "pregunta": "¿Has liderado el diseño e implementación de aplicaciones de software complejas utilizando JavaScript, ReactJS y Java?"
  },
  {
    "pregunta": "¿Cómo garantizas la calidad del código, la mantenibilidad y escalabilidad de las aplicaciones de software en las que trabajas?"
  }
]

This is my case: I want to iterate the flux obtained from the getIdEntrevista to get the id and use it in the repository method until find a non-empty list. this because some ids don't have related questions and return empty []. It must respect the non-blocking principle.

I tried alternatives, but they always gave me an error.

public Flux<SoloPreguntaImp> getPreguntas(String perfil, int limit) {
    return getIdEntrevista(perfil)
            .flatMap(idEntrevista ->  interviewTestDao.getPreguntas(idEntrevista.getId(), limit)
                    .collectList()
                    .flatMapMany(list -> {
                        if (list.isEmpty()) {
                            // Si el flujo está vacío, intenta nuevamente después de un período de tiempo
                            return Mono.error(new RuntimeException("Lista vacía"));
                        } else {
                            // Si el flujo no está vacío, emite la lista de preguntas
                            return Flux.fromIterable(list);
                        }
                    }))
            .repeatWhenEmpty(repeat -> repeat.delayElements(Duration.ofSeconds(5)))
      

Thank you in advance


Solution

  • You can use reactor.core.publisher.Flux#next for which documentation reads:

    Emit only the first item emitted by this Flux, into a new Mono. If called on an empty Flux, emits an empty Mono.

    I did this small test:

    @RestController
    public class SO78479713 {
    
      @GetMapping(path = "/78479713")
      public Mono<SoloPreguntaImp> getPreguntas(@RequestParam String perfil) {
        return getIdEntrevista(perfil)
            .flatMap(idEntrevista -> getPreguntasFromEntrevista(idEntrevista.id()))
            .next();
      }
    
      public Flux<IdEntrevista> getIdEntrevista(String perfil) {
        return Flux.just(new IdEntrevista("E_1_%s".formatted(perfil)),
                         new IdEntrevista("E_2_%s".formatted(perfil)),
                         new IdEntrevista("E_3_%s".formatted(perfil)),
                         new IdEntrevista("E_4_%s".formatted(perfil)),
                         new IdEntrevista("E_5_%s".formatted(perfil)),
                         new IdEntrevista("E_6_%s".formatted(perfil)),
                         new IdEntrevista("E_7_%s".formatted(perfil)));
      }
    
      public Flux<SoloPreguntaImp> getPreguntasFromEntrevista(String entrevistaId) {
        // The first few elements are empty, then we get two valid
        // pregunta, then the last one is empty as well.
        return switch (entrevistaId) {
          case "E_5_test" -> Flux.just(new SoloPreguntaImp("FOUND"));
          case "E_6_test" -> Flux.just(new SoloPreguntaImp("SHOULD NOT BE RETURNED"));
          default -> Flux.empty();
        };
      }
    
      record SoloPreguntaImp(String id) {
    
      }
    
      record IdEntrevista(String id) {
    
      }
    
    }
    

    It seems to be working as you expect:

    ➜  ~ curl --silent -X GET --location "http://localhost:8080/78479713?perfil=test" | jq
    {
      "id": "FOUND"
    }