I have a PublishSubject<Event>
.
On each new Event I trigger a database query (some locally cached data), then take the result and try to POST it via HTTP request to a server, when this server replies 200 I make another database query to delete the rows that were just sent.
This is done with chaining roughly as so:
subject
.toSerialized()
.flatMapMaybe { getCachedData() }
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
.subscribe()
The subject may emit two quick Events under certain conditions, let's say with a 10 ms interval.
The problem is getCachedData() for the second emission goes off immediately after getCachedData() for the first emission completed, i.e. before cleanCache() had a chance to clean the database before second emission.
I would like to combine those flatMaps into one observer somehow so that the subject does no new emissions until the whole chain has finished, preferably without any kind of handmade semaphores.
I do subscribeOn() on a single thread pool scheduler, that only orders the calls inside each flatMap.
I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work.
I saw there are also lift() and compose() operators. I tried to just put all flatMaps inside the latter, that did not change the behavior. The former I am still wondering about.
Put them inside a concatMapX
subflow:
subject
.concatMapCompletable {
getCachedData()
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
}
I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work
It has no practical effect on your flow unless you actually drive the Subject
returned by it and from multiple threads.