I'm building a chat application and I have the following situation: on my database, I have 3 sources of data but two of them depends on the first.
A -> is a Contact
B -> is the last unread message
C -> is the messages count
I need to first fetch the contacts, then using its ID I need to fetch the other two. The requirement is to keep watching for any data change in the Contact information, or unread message or message count. How can I do that using RxJava just updating the necessary not to block the UI? (Some people told me I could use Flowable for that).
What I've tried so far:
fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
Flowable.fromIterable(contacts)
.flatMapSingle { contact ->
getLastUnreadMessage(contact.id)
.materialize()
.zipWith(
getUnreadCount(contact.id)
) { msg: Notification<Messages>, unreadCount: Int ->
Chat(contact, msg.value, unreadCount)
}
}.toList().toFlowable()
}.subscribeOn(schedulers.io).observeOn(schedulers.main)
In the viewModel
var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())
But it seems it just updates once then it doesn't update any more data.
The problem in your code is, that you only observe changes of contacts list and get last messages and unread count only once per contact.
This should work:
fun queryAllChats() = dao.queryContactsFlowable()
.switchMap { contacts ->
val chats = contacts.map { contact ->
Flowable.combineLatest(
getUnreadCount(contact.id), // Flowable
getLastUnreadMessage(contact.id), // Flowable
{ unreadCount, unreadMessages ->
Chat(contact, unreadMessages, unreadCount)
})
}
return@switchMap Flowable.combineLatest(chats) {
it.map { it as Chat }
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())