kotlinreactorwebflux

Reactor: why my code works with publishOn, but not with subscribeOn


I'm learning how the scheduler works. So I tried to set the number to 100. with publishOn it works (it sets number to 100), but with subscribeOn it doesn't set number to 100. I don't understand why?

@Test
fun reactor01_LearnSchedulers(){
  var number = 0
  Mono.just(100)
    .doOnNext { numb -> number = numb }
    // .subscribeOn(Schedulers.boundedElastic()) this gives result: 0
    .publishOn(Schedulers.boundedElastic()) // this gives result: 100
    .subscribe()

  println("Result : $number")
}

update: After I added some code I found that the results are not consistent.

fun reactor01_LearnSchedulers(){
  var number = 0
  Mono.just(100)
    .doOnNext { numb -> number = numb }
    .doOnNext { println("Result from inside: $number") }
    .publishOn(Schedulers.boundedElastic())
    .subscribe()

  println("Result from outside: $number")
  // with publishOn Result from inside: 100, Result from outside: 100
  // with subscribeOn Result from outside: 0, Result from inside: 100
}

Solution

  • The problem here with timing and understanding how thread shifting works in Reactor.

    When you do subscribe(), you do that exactly on the same thread which has that reactive stream definition or on the thread which you specify in the subscribeOn().

    The publishOn() is independent here and does not impact a subscriber behavior.

    So, when you have a publishOn() but not subscribeOn(), with that subscribe() you still make the flow to execute in the same thread as your whole test method. The println() is executed just after subscribe() and the whole stream evaluation. But when you add subscribeOn(), the subscription and execution of the flow happens on that thread leaving the main thread as free to go ahead for the next statement in your code. Therefore since you are in async situation, you are not lucky to get that number being set for an expected value.

    I would recommend you to learn a StepVerifier to understand how test all of these Reactor streams: https://projectreactor.io/docs/core/release/reference/#testing