rx-java2rx-kotlin2

Using `onBackpressureLatest` to drop intermediate messages in blocking Flowable


I have a chain where I do some blocking IO calls (e.g. HTTP-call). I want the blocking call to consume a value, proceed without interrupting, but drop everything that is piling up meanwhile, and then consume the next value in the same manner.

Consider the following example:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}

From a naive point of view, I would it expect to print something like 0, 10, 20, ..., but it prints 0, 1, 2, ....

What am I doing wrong?

EDIT:

I thought about naively adding debounce to eat up the incoming stream:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

But, now I get a java.lang.InterruptedException: sleep interrupted.

EDIT:

What seems to work is the following:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

The output is as expected 0, 10, 20, ...!!

Is that the correct way?

I noted that throttleLast will switch to the Computation-Scheduler. Is there a way to go back to the original scheduler?

EDIT:

I also get an occasional java.lang.InterruptedException: sleep interrupted with that variant.


Solution

  • The most simple approach to solve the problem is:

    fun <T> Flowable<T>.lossy() : Flowable<T> {
      return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
    }
    

    By calling lossy on a Flowable it starts to drop all element that are coming in faster than the downstream consumer can process.