scalaobservablerx-scala

RxScala doOnCompleted not firing after call to .take()


I am new to RxScala Observables and am experiencing strange behaviour when using a combination of take(n) and doOnCompleted().

Below I have an example of a test where I believe the first subscribe is correct (with take(2) at the start) and outputs all the logs for the subscribe, next, completed and unsubscribe methods, however the second test (take(2) after the doOnComplete) never reaches the doOnCompleted methods.

import rx.lang.scala.Observable

object Tester extends App {

    val obs = Observable.from(List(1,2,3,4))

    val obsAddMethodsCorrect = obs.take(2)
        .doOnSubscribe( println("subscribe") )
        .doOnNext( n => println(s"next $n") )
        .doOnError( e => println("error") )
        .doOnCompleted( println("completed") )
        .doOnUnsubscribe( println("unsubscribe") )

    val obsAddMethodsInCorrect = obs
        .doOnError( e => println("error") )
        .doOnCompleted( println("completed") )
        .take(2)
        .doOnNext( n => println(s"next $n") )
        .doOnUnsubscribe( println("unsubscribe") )
        .doOnSubscribe( println("subscribe") )

    obsAddMethodsCorrect.toBlocking.subscribe()
    println("")
    println("The above seems correct. Below seems incorrect")
    println("")
    obsAddMethodsInCorrect.toBlocking.subscribe()

}

current output of the above test below.

subscribe
next 1
next 2
completed
unsubscribe

The above seems correct. Below seems incorrect

subscribe
next 1
next 2
unsubscribe

Why doesn't doOnCompleted() get fired in the second example?


Solution

  • doOnCompleted() operator will invoke its action when onCompleted() event is fired. However, when you unsubscribe from an observer before it has finished emitting items, onCompleted() does not get fired. The observer chain is effectively cancelled.