androidrx-javabackpressure

Confusion on RxJava onBackPressureBuffer and onBackPressureDrop


I want my code work like this graph, but it does not work ... enter image description here

My Code:

    private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
        return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
            /* filter target event. */
            EventPredictable(cls)
        ).cast(
            /* cast to target event */
            cls
        ).onBackpressureDrop {
            Log.i(TAG, "drop event: $it")
        }.concatMap { data ->
            /* start interval task blocking */
            val period = 1L
            val unit = TimeUnit.SECONDS
            MLog.d(TAG, "startInterval: data = $data")
            Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
                getStopFlowable()
            ).map {
                Pair(data, it)
            }
        }
    }

    private fun getStopFlowable(): Flowable<StopIntervalEvent> {
        return RxBus.getDefault().register(StopIntervalEvent::class.java)
                    .toFlowable(BackpressureStrategy.LATEST)
    }

when I send 140 event in 10 ms, my code drop 12 event, not dropping 140 - 4 = 136 event that I expect. Why my code don't work like the graph above? Thank for your watching and answers!


Solution

  • onBackpressurDrop is always ready to receive items thus onBackpressureBuffer has no practical effect in your setup. onBackpressurBuffer(int) would fail on overflow so you'd never se the expected behavior with it. In addition, concatMap fetches 2 items upfront by default so it will get source items 1 and 2.

    Instead, try using the overload with the backpressure strategy configurable:

     mRelay
     .toFlowable(BackpressureStaregy.MISSING)
     .onBackpressureBuffer(4, null, BackpressureOverflowStrategy.DROP_LATEST)
     .flatMap(data -> 
         Flowable.intervalRange(0, DURATION.toLong(), 0, period, unit)
         .takeUntil(getStopFlowable())
         .map(it -> new Pair(data, it))
         , 1 // <--------------------------------------------------- max concurrency
     );