I'm having trouble with understanding how to achieve my goal with reactive approach. Let's assume that I have a Controller, that will return Flux:
@PostMapping(value = "/mutation/stream/{domainId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Mutation> getMutationReactive(@RequestBody List<MutationRequest> mutationRequests, @PathVariable Integer domainId) {
return mutationService.getMutations(mutationRequests, domainId);
}
In service, currently with .subscribeOn(Schedulers.boundedElastic()), because it calls for a blocking code that is wrapped into a Callable.
public Flux<Mutation> getMutations(List<MutationRequest> mutationRequests, int domainId) {
return Flux.fromIterable(mutationRequests)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(mutationRequest -> getMutation(mutationRequest.getGameId(), mutationRequest.getTypeId(), domainId));
}
getMutation() with blocking calls, currently wrapped into a Callable:
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return Mono.fromCallable(() -> {
Mutation mutation = mutationProvider.findByGameIdAndTypeId(gameId, typeId).block(); // mutationProvider.findByGameIdAndTypeId() returns Mono<Mutation>
if (mutation == null) {
throw new RuntimeException("Mutation was not found by gameId and typeId");
}
State state = stateService.getStateByIds(mutation.getId()), domainId).blockFirst(); //stateService.getStateByIds() returns Mono<State>
if (state == null || state.getValue() == null) {
log.info("Requested mutation with gameId[%s] typeId[%s] domainId[%s] is disabled. Value is null.".formatted(gameId, typeId, domainId));
return null;
}
mutation.setTemplateId(state.getTemplateId());
return (mutation);
});
}
How do I approach the getMutation() function to use reactive streams, instead of using .block() methods inside a Callable? Basically, I first need to retrieve Mutation from DB -> then using ID of mutation, get its state from other service -> then if state and its value are not null, set templateId of state to mutation and return, or return null.
I've tried something like this:
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
.flatMap(mutation -> {
stateService.getStatesByIds(mutation.getId(), domainId).flatMap(state -> {
if (state != null && state.getValue() != null) {
mutation.setTemplateId(state.getTemplateId());
}
//TODO if state/value is null -> need to propagate further to return null instead of mutation...
return Mono.justOrEmpty(state);
});
return Mono.just(mutation);
});
}
But it's obviously incorrect, nothing is subscribed to stateService.getStatesByIds(mutation.getId()), domainId) AND I would like to return a null if the retrieved state of mutation or its value are null.
You are ignoring the value of the inner flatMap
hence the warning.
Without trying you need something like this
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
.flatMap(mutation -> {
return stateService.getStatesByIds(mutation.getId(), domainId).flatMap(state -> {
if (state != null && state.getValue() != null) {
mutation.setTemplateId(state.getTemplateId());
return Mono.just(mutation);
}
return Mono.empty();
});
});
}
Although not sure if you could rewrite the outer flatMap
not to a regular map
instead and you might want to use filter
and defaultIfEmpty
with that as well
private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
.flatMap(mutation -> {
return stateService.getStatesByIds(mutation.getId(), domainId)
.filter(state -> state != null && state.getValue() != null)
.flatMap(state -> {
mutation.setTemplateId(state.getTemplateId());
return Mono.just(mutation);})
.defaultIfEmpty(Mono.empty());
}
This is just from the top of my head and I have no idea what some of the return types are here (Flux
or Mono
) for your own APIs.