androidkotlinrx-javaflowable

How to create a Flowable that updates from multiple data?


I'm building a chat application and I have the following situation: on my database, I have 3 sources of data but two of them depends on the first.

A -> is a Contact
B -> is the last unread message
C -> is the messages count

I need to first fetch the contacts, then using its ID I need to fetch the other two. The requirement is to keep watching for any data change in the Contact information, or unread message or message count. How can I do that using RxJava just updating the necessary not to block the UI? (Some people told me I could use Flowable for that).

What I've tried so far:

fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
        Flowable.fromIterable(contacts)
            .flatMapSingle { contact ->
                getLastUnreadMessage(contact.id)
                    .materialize()
                    .zipWith(
                        getUnreadCount(contact.id)
                    ) { msg: Notification<Messages>, unreadCount: Int ->
                        Chat(contact, msg.value, unreadCount)
                    }
            }.toList().toFlowable()
    }.subscribeOn(schedulers.io).observeOn(schedulers.main)

In the viewModel

var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())

But it seems it just updates once then it doesn't update any more data.


Solution

  • The problem in your code is, that you only observe changes of contacts list and get last messages and unread count only once per contact.

    This should work:

      fun queryAllChats() = dao.queryContactsFlowable()
        .switchMap { contacts ->
          val chats = contacts.map { contact ->
            Flowable.combineLatest(
              getUnreadCount(contact.id), // Flowable
              getLastUnreadMessage(contact.id), // Flowable
              { unreadCount, unreadMessages ->
                Chat(contact, unreadMessages, unreadCount)
              })
          }
          return@switchMap Flowable.combineLatest(chats) {
            it.map { it as Chat }
          }
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())