I'm working with Spring Webflux + R2DBC to insert some data into a database.
Call for data -> Put in database -> Return data.
Controller
@RestController
public class UtilController {
private final FooService service;
@Autowired
public UtilController(FooService service) { this.service = service; }
@GetMapping(path = "/load-data")
public Mono<List<Foo>> loadFoos() { return service.addFoos(); }
}
Service
public Mono<List<Foo>> addFoos() {
return webClient.getSomeStuff()
.map(results -> results.stream()
.map(/* ...transform to domain model */)
.toList())
.map(foos -> {
// This is the problem site.
List<Foo> buf = new ArrayList<>();
foos.forEach(f -> {
fooDao.createFoo(l)
.doOnNext(buf::add)
.next()
.subscribe();
});
return buf;
});
}
DAO
public Flux<Foo> createFoo(final Foo foo) {
var query = "<INSERT_STUFF_STATEMENT>";
// "publisher" is private final Publisher<? extends Connection> publisher;
var connection = Mono.from(publisher);
return connection.flatMapMany(c -> c.createStatement(query)
.bind("STUFF", foo.getStuff())
.execute())
.map(result -> Foo.builder()
.stuff(whatever)
.build());
}
The main problem is I can't figure out how to make it wait for all the results to be returned and added to the eventual List<Foo>
to return back to the controller. There must be some combo of the map
/flatMap
/flatMapMany
/etc...
that I'm missing to do this correctly. As it stands, it only gets about 80% of the data into the db and 10% into the array before the controller sends back the response and shuts down the processing
Okey, first of all, when you call a reactive endpoint, what the framework will do is to call all your operators in the opposite direction, upstream until it finds a producer.
A producer is something that can produce items for us, for instance a Mono<T>
or a Flux<T>
.
This is called the assembly phase and if you break this chain when it is calling everything in reverse order, you are what we call breaking the reactive chain
.
When the chain is broken it will result in that the framework can't find a producer, which in turn means that it can't produce items.
You have probably heard of the term nothing happens until you subscribe
but this also infers that the chain is intact between the subscriber, and the producer.
List<Foo> buf = new ArrayList<>();
foos.forEach(f -> {
// here is a return, you are not handling
fooDao.createFoo(l)
.doOnNext(buf::add)
.next()
.subscribe();
});
return buf;
The comment in the above code states where you are breaking the chain.
you are getting foos
and you iterate
through the items in foos
. For each foo
you are saving to the database you are updating a state (adding the item to the list). But what you are missing is that fooDao.createFoo
returns a producer (a Mono
or a Flux
) that you are not connecting to the rest of the chain.
You should avoid states in reactive programming, always do pure programming
and pure functions
.
You probably saw that nothing was getting stored, so you slapped on a subscribe
to try to "activate" the saving.
In general, the one starting the entire chain, is the subscriber. So in your case, its the calling client to your server. So you should not anywhere in your code, be subscribing. You job is to build the chain of item producing, so we can produce items to the calling client.
so what you should of done, is to do something like:
return fooDao.createFoo(item);
You always
need to handle and chain on the returns. You can't ignore the returns.
I rewrote your code to not use List<T>
and you should in general never iterate over a list in reactive java. Instead you should place your list into a Flux
so that it produces items from the list. I do that with Flux#fromIterable
.
A list
is of finite length, while a Flux
can be of infinite length. It will just produce items for us as long as it contains items. And we can actually put items into it while it is producing items. Its sort of a queue instead of a list.
public Mono<List<DTO>> addFoos() {
return getData()
.flatMap(dataHolder -> {
return Flux.fromIterable(dataHolder.getResults())
.flatMap(data -> Mono.just(new DTO(data.getFoo())))
.flatMap(dto -> save(dto).thenReturn(dto))
.collectList();
};
}
Here you can see that I extract the list, place it into a Flux and then from the Flux I continue the chain. If you can, use flatMap
over map
for efficiency but when using flatMap
order is not guaranteed.
I then save each item, but I have no idea what your DAO returns, so by using thenReturn
I basically ignore what saveFoo
returns and tell it to return my dto instead.
Then I collect all to a list and we can return this list to the calling client.