I need to handle different types of events in a strict one by one manner, but in a background thread.
According to the documentation, the next code, Schedulers.from(executor, false, true);
, should cover my requirements, but in reality it doesn't.
The code:
ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();
subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);
subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");
log.info("Test activity");
Has the following output:
22:19:05.313 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
Which shows, that events handling is executed in a greedy manner, when every Observer gives all events before freeing the scheduler
, which contradicts with the documentation.
If .observeOn(scheduler)
is replaced with the .subscribeOn(scheduler)
the output is the next:
22:23:56.162 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
Which executes all events in the same thread, which contradicts with the whole idea of .subscribeOn
.
Is this a bug or there is a way to make it work as expected in the documentation? The version is io.reactivex.rxjava3:rxjava:3.1.9
.
It is not a bug with the Scheduler
but the consequence of the observeOn
which always operates in a greedy manner. In the first case, because all items to the first sequence was available practically immediately to the observeOn, it emits those on the same thread in one executor run.
You can use another operator which creates one task per item such as delay
with zero delay to get a better interleaving:
subject1.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
subject2.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
The second case is working as intended because using subscribeOn
on a Subject
has no effect on the items it delivers. In your case, the items were emitted and thus processed on the same thread as it would happen without subscribeOn
.