rx-javakotlin

RxJava: Unsubscribe async observable from within another async production


What is a best (clean) way to unsubscribe an async Observable which is produced in a flatMap of another async one? Say, I have a presentation that shows a chat list for user. Each Chat item should present the latest message of itself. Here is an original code to illustrate:

fun AsyncInsideAsync() {
    /**
     * A chat that 'transmits' latest message
     */
    class Chat(val name: Int) {
        val lastMessage: PublishSubject<String> = PublishSubject.create()

        fun postMessage(message: Int) {
            lastMessage.onNext("Chat: $name, Message: $message")
        }
    }

    // List of chats for user.
    val chats: PublishSubject<Chat> = PublishSubject.create()

    ///////////////////////////////////
    //        ORIGINAL SEQUENCE      //
    ///////////////////////////////////
    val sequence = chats.flatMap { it.lastMessage }

    val subscriber = TestSubscriber<String>()
    sequence.subscribe(subscriber)

    // User has single chat in a chat-list
    val chat1 = Chat(1)
    chats.onNext(chat1)

    // Someone posts a message in Chat 1
    chat1.postMessage(1)
    subscriber.assertValues(
            "Chat: 1, Message: 1"
    )

    // Someone posts another message
    chat1.postMessage(2)
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2"
    )

    // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created
    val chat2 = Chat(2)
    chats.onNext(chat2)

    // Someone posts a message to Chat 2
    chat2.postMessage(1)
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2",
            "Chat: 2, Message: 1"
    )

    // Someone out there posts a message to Chat 1 that is not visible to user anymore
    chat1.postMessage(3)

    // The answer looks like this 
    //      "Chat: 1, Message: 1",
    //      "Chat: 1, Message: 2",
    //      "Chat: 2, Message: 1",
    //      "Chat: 1, Message: 3"
    // Chat 1 is still subscribed and test fails
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2",
            "Chat: 2, Message: 1",
    )
}

What I came up to is using a subject (or shared observable) to brake a chain of internal subscriptions. But it looks weird:

    ///////////////////////////////////
    //        MODIFIED SEQUENCE      //
    ///////////////////////////////////
    val unsubscribe: PublishSubject<Boolean> = PublishSubject.create()
    val sequence = chats
            .doOnNext({ unsubscribe.onNext(true) })
            .doAfterTerminate({ unsubscribe.onNext(true) })
            .flatMap {
                it.lastMessage.takeUntil(unsubscribe)
            }

This approach works but looks frighteningly. Thanks a lot!


Solution

  • Assuming that the comment

    // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created
    

    indicates that you'd like the current Chat (chat1 in this case) to stop emitting when a new Chat is sent on chats, you can accomplish this with the switchMap operator.

    val sequence = chats.switchMap { it.lastMessage }
    

    From the ReactiveX documentation:

    RxJava also implements the switchMap operator. It behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.