I am trying to create a flow where a flux emits 10 items, each in parallel, with each item sleeping for 1s. Since each item is being published on a separate thread, I expect the entire process to take 1s. But the logs show that it's taking 10s instead.
I tried changing subscribeOn to publishOn, map to doOnNext. But none of them seem to work.
I am new to Reactor and am trying to understand where I am going wrong. Any help would be most appreciated. Thanks
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.publishOn(Schedulers.elastic())
.map(count -> {
logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
})
.blockLast();
}
2020-03-30 16:17:29.799 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 10
You have to first create a parallel flux by calling parallel
method and you have to use runOn
to achieve parallelism.
Flux.range(1,10)
.parallel()
.runOn(Schedulers.elastic())
.map(count -> {
System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
}).subscribe();
Schedulers.boundedElastic()
as using Scheduler.elastic()
is discouragedparallel
by default will create threads based on your CPU core. If you want more threads use parallel(10)
- I think this is what you want to see.