I have a primitive Flux
of String
s, and run this code in the main()
method.
package com.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
import java.util.Arrays;
import java.util.List;
public class Parallel {
private static final Logger log = Loggers.getLogger(Parallel.class.getName());
private static List<String> COLORS = Arrays.asList("red", "white", "blue");
public static void main(String[] args) throws InterruptedException {
Flux<String> flux = Flux.fromIterable(COLORS);
flux
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub", 1))
.subscribe(value -> {
log.info("==============Consumed: " + value);
});
}
}
If you try to run this code the app never stops running and you need to stop it manually.
If I replace .newParallel()
with .parallel()
everything works as expected and the app finishes normally.
Why can't it finish running on its own? Why does it hang? What is the reason for this behavior?
If you run this code as a JUnit test it works fine and it doesn't hang.
Scheduler
instances that you create yourself with newXxx
factory methods are created in non-daemon mode by default, which means that it can prevent the JVM from exiting.
JUnit calls System.exit()
when all the tests have run, which explains why the test scenario doesn't hang.
In this context, Schedulers.newSingle()
and Schedulers.newParallel()
variants are the worst "offenders", because the threads created are not culled after an inactivity timeout, unlike with Schedulers.newBoundedElastic()
.
If in a real world scenario you have a well defined application lifecycle, you could store the Scheduler
instances somewhere (eg. as beans) and ensure each Scheduler#dispose()
is called at the end of the application lifecycle.
Easier solution: create the Schedulers
explicitly with daemon == true
using the relevant factory overload.