I can't find the difference in using Mono.block() and Mono.subscribe()
For me when using both methods the code behaves exactly the same, and it should not.
For Mono.block() my expectation is that a thread that calls it will block and wait for results, but yet it is used in map method of a Mono, and basically unblocks itself.
I have following fragment of code using Mono.block():
void doBlocking() {
final var myMono = Mono.just("test").map(elem -> {
System.out.printf("On thread: [%s] inside map\n",Thread.currentThread().getName());
return elem;
});
String value;
System.out.printf("On thread: [%s] before block\n",Thread.currentThread().getName());
value = myMono.block();
System.out.printf("On thread: [%s] after block\n",Thread.currentThread().getName());
System.out.println(value);
}
When I call this code I receive the following:
On thread: [main] before block
On thread: [main] inside map
On thread: [main] after block
test
From my understanding, Mono.block() is a blocking method, so I would assume that the thread will get blocked as it would when acquiring a lock. But instead thread is used to execute code inside the map of Mono, which means that it is not blocked at all.
For Mono.subscribe(), I would expect that the thread calling subscribe will just continue and not wait for results, but it does behave exactly the same as when using Mono.block().
I have a similar fragment but now using subscribe instead of block.
void doSubscribing() {
final var myMono = Mono.just("test").map(elem -> {
System.out.printf("On thread: [%s] inside map\n",Thread.currentThread().getName());
return elem;
});
AtomicReference<String> value = new AtomicReference<>();
System.out.printf("On thread: [%s] before subscribe\n",Thread.currentThread().getName());
myMono.subscribe(value::set);
System.out.printf("On thread: [%s] after subscribe\n",Thread.currentThread().getName());
System.out.println(value);
}
When I call this code again, I am getting the same results:
On thread: [main] before subscribe
On thread: [main] inside map
On thread: [main] after subscribe
test
I would expect that when I call subscribe, the current thread will continue working possibly displaying:
On thread: [main] after subscribe
null
In my case both block and subscribe behave exactly the same, so what's the real difference?
It's your assumptions that are faulty, and your test are not correct to show the difference.
nothing happens until you subscribe (or block).
Reactor is made to sort of abstract away the threading for you so that you don't need to use mutex, atomics, locks, synchronizations etc. That's the whole point of it. So were your code is executed, on a thread, on the same thread etc shouldn't matter to you. What does matter is that it uses NIO threads, which means you should never block them.
From my understanding Mono.block() is blocking method, so I would assume that the thread will get blocked as it would when acquiring a lock. But instead thread is used to execute code inside map of Mono which means that it is not blocked at all.
Here is your first assumption, you think it's a different thread that will run this. That is not certain, as Reactor will only use different threads when needed, otherwise it will just run code async which doesn't necessarily mean it will be running on different threads. You are mixing up the difference between async and parallel as they are not the same thing.
Also locks and mutex is only used if there are multiple threads that want to access the same resource, there is no resource here that wants to be accessed by multiple threads.
What your blocking code is basically saying:
void doBlocking() {
// I want to do this some time in the future
final var myMono = Mono.just("test").map(elem -> {
System.out.printf("On thread: [%s] inside map\n",Thread.currentThread().getName());
return elem;
});
String value;
System.out.printf("On thread: [%s] before block\n",Thread.currentThread().getName());
// Here I want you to run the above code and we will wait here until it is done,
// this can be done on the same thread or a different thread. We don't care will most
// likely be the same thread for efficiency, because there is no reason to run a new
// thread for it.
value = myMono.block();
// Print the results
System.out.printf("On thread: [%s] after block\n",Thread.currentThread().getName());
System.out.println(value);
}
And your console output shows exactly that.
When it comes to subscribe though, we run code, will a callback, you basically subscribe to something that is to be performed but we wont wait for the results, we will attach a callback to the result.
void doSubscribing() {
// Once again, declare we want to do this in the future sometime.
final var myMono = Mono.just("test").map(elem -> {
System.out.printf("On thread: [%s] inside map\n",Thread.currentThread().getName());
return elem;
});
// Not really a need for Atomic reference because there is no guarantee
// that this will be executed on different threads it's just you that
// assumes so
AtomicReference<String> value = new AtomicReference<>();
System.out.printf("On thread: [%s] before subscribe\n",Thread.currentThread().getName());
// Here the code is run with a callback, and since there are no delays
// the code is run so fast the callback is executed immediately,
// possibly on the same thread for efficiency, because switching threads
// is resource demanding
myMono.subscribe(value::set);
// Here the result is printed
System.out.printf("On thread: [%s] after subscribe\n",Thread.currentThread().getName());
System.out.println(value);
}
So it is not always different threads running underneath. You can force reactor to run on multiple threads, but if it can avoid it, it will.
Second of all, computers are fast, like really really fast, so you need to introduce delays to show that subscribe and block are different, and also sort of give reactor a reason to run code the way you want it to.
I did find these examples:
What is the difference between block() , subscribe() and subscribe(-)