androidrx-javabackpressure

Understanding the capacity param in RxJava onBackpressureBuffer


Here is a small sample app that I wrote:

package ru.maksim.sample.app

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit


class MainActivity : AppCompatActivity() {
    private val subject = PublishSubject.create<Int>()
    private lateinit var disposable: Disposable
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        disposable = observeInts()
            .subscribe(
                {
                    Log.d("SampleApp", "next=$it")
                },
                {
                    Log.e("SampleApp", "error", it)
                },
                {
                    Log.d("SampleApp", "complete")
                }
            )
        start.setOnClickListener {
            subject.onNext(1)
        }
    }

    override fun onDestroy() {
        disposable.dispose()
        super.onDestroy()
    }

    private fun observeInts() = subject
        .toFlowable(BackpressureStrategy.BUFFER)
        .onBackpressureBuffer(4, {
            Log.d("SampleApp", "Overflow")
        }, BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.computation())
        .flatMap {
            Log.d("SampleApp", "onNext BEFORE delay: $it")
            Flowable.just(it)
        }
        .delay(10L, TimeUnit.SECONDS)
        .flatMap {
            Log.d("SampleApp", "onNext AFTER delay: $it")
            Flowable.just(it)
        }
}

start is just a button. After pressing the button more than 4 (4 is the buffer capacity as you can see in onBackpressureBuffer) times, I expected to see Overflow is logs, but it didn't happen. I don't understand why.


Solution

  • I think I found an answer here. Namely,

    onBackpressureBuffer(int capacity)
    

    This is a bounded version that signals BufferOverflowErrorin case its buffer reaches the given capacity.

    Flowable.range(1, 1_000_000)
              .onBackpressureBuffer(16)
              .observeOn(Schedulers.computation())
              .subscribe(e -> { }, Throwable::printStackTrace);
    

    The relevance of this operator is decreasing as more and more operators now allow setting their buffer sizes. For the rest, this gives an opportunity to "extend their internal buffer" by having a larger number with onBackpressureBuffer than their default.


    It looks like in addition to 16 passed to onBackpressureBuffer other operators have their own buffers. And when the 17-th item is received, the preceding 16 might be buffered in different operators' buffers.