javaspring-webfluxr2dbcr2dbc-mssql

Spring Webflux Wait for all Flux<T> Before Returning Response


I'm working with Spring Webflux + R2DBC to insert some data into a database.

Call for data -> Put in database -> Return data.

Controller

@RestController
public class UtilController {
  private final FooService service;

  @Autowired
  public UtilController(FooService service) { this.service = service; }

  @GetMapping(path = "/load-data")
  public Mono<List<Foo>> loadFoos() { return service.addFoos(); }
}

Service

public Mono<List<Foo>> addFoos() {
  return webClient.getSomeStuff()
      .map(results -> results.stream()
          .map(/* ...transform to domain model */)
          .toList())
      .map(foos -> {
        // This is the problem site.
        List<Foo> buf = new ArrayList<>();
        foos.forEach(f -> {
          fooDao.createFoo(l)
              .doOnNext(buf::add)
              .next()
              .subscribe();
        });

        return buf;
      });
}

DAO

public Flux<Foo> createFoo(final Foo foo) {
  var query = "<INSERT_STUFF_STATEMENT>";

  // "publisher" is private final Publisher<? extends Connection> publisher;
  var connection = Mono.from(publisher);
  return connection.flatMapMany(c -> c.createStatement(query)
      .bind("STUFF", foo.getStuff())
      .execute())
      .map(result -> Foo.builder()
          .stuff(whatever)
          .build());
}

The main problem is I can't figure out how to make it wait for all the results to be returned and added to the eventual List<Foo> to return back to the controller. There must be some combo of the map/flatMap/flatMapMany/etc... that I'm missing to do this correctly. As it stands, it only gets about 80% of the data into the db and 10% into the array before the controller sends back the response and shuts down the processing


Solution

  • Okey, first of all, when you call a reactive endpoint, what the framework will do is to call all your operators in the opposite direction, upstream until it finds a producer.

    A producer is something that can produce items for us, for instance a Mono<T> or a Flux<T>.

    This is called the assembly phase and if you break this chain when it is calling everything in reverse order, you are what we call breaking the reactive chain.

    When the chain is broken it will result in that the framework can't find a producer, which in turn means that it can't produce items.

    You have probably heard of the term nothing happens until you subscribe but this also infers that the chain is intact between the subscriber, and the producer.

    List<Foo> buf = new ArrayList<>();
        foos.forEach(f -> {
            
            // here is a return, you are not handling
            fooDao.createFoo(l)
                .doOnNext(buf::add)
                .next()
                .subscribe();
        });
    
        return buf;
    

    The comment in the above code states where you are breaking the chain.

    you are getting foos and you iterate through the items in foos. For each foo you are saving to the database you are updating a state (adding the item to the list). But what you are missing is that fooDao.createFoo returns a producer (a Mono or a Flux) that you are not connecting to the rest of the chain.

    You should avoid states in reactive programming, always do pure programming and pure functions.

    You probably saw that nothing was getting stored, so you slapped on a subscribe to try to "activate" the saving.

    In general, the one starting the entire chain, is the subscriber. So in your case, its the calling client to your server. So you should not anywhere in your code, be subscribing. You job is to build the chain of item producing, so we can produce items to the calling client.

    so what you should of done, is to do something like:

    return fooDao.createFoo(item);
    

    You always need to handle and chain on the returns. You can't ignore the returns.

    I rewrote your code to not use List<T> and you should in general never iterate over a list in reactive java. Instead you should place your list into a Flux so that it produces items from the list. I do that with Flux#fromIterable.

    A list is of finite length, while a Flux can be of infinite length. It will just produce items for us as long as it contains items. And we can actually put items into it while it is producing items. Its sort of a queue instead of a list.

    public Mono<List<DTO>> addFoos() {
        return getData()
            .flatMap(dataHolder -> {
                return Flux.fromIterable(dataHolder.getResults())
                            .flatMap(data -> Mono.just(new DTO(data.getFoo())))
                            .flatMap(dto -> save(dto).thenReturn(dto))
                            .collectList();
             };
    }
    

    Here you can see that I extract the list, place it into a Flux and then from the Flux I continue the chain. If you can, use flatMap over map for efficiency but when using flatMap order is not guaranteed.

    I then save each item, but I have no idea what your DAO returns, so by using thenReturn I basically ignore what saveFoo returns and tell it to return my dto instead.

    Then I collect all to a list and we can return this list to the calling client.