I want my code work like this graph, but it does not work ...
My Code:
private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
/* filter target event. */
EventPredictable(cls)
).cast(
/* cast to target event */
cls
).onBackpressureDrop {
Log.i(TAG, "drop event: $it")
}.concatMap { data ->
/* start interval task blocking */
val period = 1L
val unit = TimeUnit.SECONDS
MLog.d(TAG, "startInterval: data = $data")
Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
getStopFlowable()
).map {
Pair(data, it)
}
}
}
private fun getStopFlowable(): Flowable<StopIntervalEvent> {
return RxBus.getDefault().register(StopIntervalEvent::class.java)
.toFlowable(BackpressureStrategy.LATEST)
}
when I send 140 event in 10 ms, my code drop 12 event, not dropping 140 - 4 = 136 event that I expect. Why my code don't work like the graph above? Thank for your watching and answers!
onBackpressurDrop
is always ready to receive items thus onBackpressureBuffer
has no practical effect in your setup. onBackpressurBuffer(int)
would fail on overflow so you'd never se the expected behavior with it. In addition, concatMap
fetches 2 items upfront by default so it will get source items 1 and 2.
Instead, try using the overload with the backpressure strategy configurable:
mRelay
.toFlowable(BackpressureStaregy.MISSING)
.onBackpressureBuffer(4, null, BackpressureOverflowStrategy.DROP_LATEST)
.flatMap(data ->
Flowable.intervalRange(0, DURATION.toLong(), 0, period, unit)
.takeUntil(getStopFlowable())
.map(it -> new Pair(data, it))
, 1 // <--------------------------------------------------- max concurrency
);