I have a module that accepts entity IDs and a "resolution type" as parameters, and then gathers data (primarily) asynchronously via multiple operations that return Fluxes. The resolution is broken into multiple (primarily, again) asynchronous operations that each work on gathering different data types that contribute to the resolution. I say "primarily" asynchronously because some of the resolution types require some preliminary operation(s) that must happen synchronously to provide information for the remaining asynchronous Flux operations of the resolution. Now, while this synchronous operation is taking place, at least a portion of the overall asynchronous resolution operation can begin. I would like to start these Flux operations while the synchronous operations are taking place. Then, once the synchronous data has been resolved, I can get each Flux for the remaining operations underway. Some resolution types will have all Flux operations returning data, while others gather less information, and some of the Flux operations will remain empty. The resolution operations are time-expensive, and I would like to be able to start some Flux operations earlier so that I can compress the time a bit -- that is quite important for what I am accomplishing. So eager subscription is ideal, as long as I can guarantee that I will not miss any item emission.
With that in mind, how can I:
Flux.empty()
)collectList()
on them to produce a Mono
.Flux
operations should start before some of the others, how can I start them, and ensure that I do not miss any data? And if I start a name resolution Flux, for example, can I add to it, as in item 2 above? Let's say that I want to start retrieving some data, then perform a synchronous operation, and then I create another name resolution Flux from the result of the synchronous operation, can I append this new Flux to the original name resolution Flux, since it will be returning the same data type? I am aware of Flux.merge()
, but it would be convenient to work with a single Flux reference that I can keep adding to, if possible.Will I need a collection object, like a list, and then use a merge operation? Initially, I thought about using a ConnectableFlux
, until I realized that it is for connecting multiple subscribers, rather than for connecting multiple publishers. Connecting multiple publishers is what I think would be a good answer for my need, unless this is a common pattern that can be handled in a better way.
I have only been doing reactive programming for a short time, so please be patient with the way I am trying to describe what I want to do. If I can better clarify my intentions, please let me know where I have been unclear, and I will gladly attempt to clear it up. Thanks in advance for your time and help!
EDIT: Here is the final Kotlin version, nice and concise:
private val log = KotlinLogging.logger {}
class ReactiveDataService {
private val createMono: () -> Mono<List<Int>> = {
Flux.just(9, 8, 7)
.flatMap {
Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
.parallel()
.runOn(Schedulers.boundedElastic())
}
.collectList()
.cache()
}
private val processResults: (List<String>, List<String>) -> String =
{ d1, d2 -> "\n\tdownstream 1: $d1\n\tdownstream 2: $d2" }
private val convert: (List<Int>, Int) -> Flux<String> =
{ data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }
fun doQuery(): String? {
val mono = createMono()
val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
return Mono.zip(downstream1, downstream2, processResults).block()
}
}
fun main() {
val service = ReactiveDataService()
val start = System.currentTimeMillis()
val result = service.doQuery()
log.info("{}\n\tTotal time: {}ms", result, System.currentTimeMillis() - start)
}
And the output:
downstream 1: [ 66, 39, 40, 88, 97, 35, 70, 91, 27, 12, 84, 37, 35, 15, 45, 27, 85, 22, 55, 89, 81, 21, 43, 62]
downstream 2: [132, 78, 80, 176, 194, 70, 140, 182, 54, 24, 168, 74, 70, 30, 90, 54, 170, 44, 110, 178, 162, 42, 86, 124]
Total time: 209ms
It sounds like an ideal job for reactor. The synchronous calls can be wrapped to return as Fluxes (or Monos) using an elastic scheduler to allow them to be executed in parallel. Then using the various operators you can compose them all together to make a single Flux which represents the result. Subscribe to that Flux and the whole machine will kick off.
I think you need to use Mono.flatMapMany instead of Flux.usingWhen.
public class ReactiveDataService {
public static void main(final String[] args) {
ReactiveDataService service = new ReactiveDataService();
service.doQuery();
}
private Flux<Integer> process1(final List<Integer> data) {
return Flux.fromIterable(data);
}
private Flux<Integer> process2(final List<Integer> data) {
return Flux.fromIterable(data).map(i -> i * 2);
}
private String process3(List<Integer> downstream1, List<Integer> downstream2) {
System.out.println("downstream 1: " + downstream1);
System.out.println("downstream 2: " + downstream2);
return "Done";
}
private void doQuery() {
final Mono<List<Integer>> mono =
Flux.just(9, 8, 7)
.flatMap(
limit ->
Flux.fromStream(
Stream.generate(() -> new Random().nextInt(100))
.peek(
i -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
})
.limit(limit))
.parallel()
.runOn(Schedulers.boundedElastic()))
.collectList()
.cache();
final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
Mono.zip(downstream1, downstream2, this::process3).block();
}
}