Here is a small sample app that I wrote:
package ru.maksim.sample.app
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit
class MainActivity : AppCompatActivity() {
private val subject = PublishSubject.create<Int>()
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposable = observeInts()
.subscribe(
{
Log.d("SampleApp", "next=$it")
},
{
Log.e("SampleApp", "error", it)
},
{
Log.d("SampleApp", "complete")
}
)
start.setOnClickListener {
subject.onNext(1)
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
private fun observeInts() = subject
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(4, {
Log.d("SampleApp", "Overflow")
}, BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.computation())
.flatMap {
Log.d("SampleApp", "onNext BEFORE delay: $it")
Flowable.just(it)
}
.delay(10L, TimeUnit.SECONDS)
.flatMap {
Log.d("SampleApp", "onNext AFTER delay: $it")
Flowable.just(it)
}
}
start
is just a button. After pressing the button more than 4 (4 is the buffer capacity as you can see in onBackpressureBuffer
) times, I expected to see Overflow
is logs, but it didn't happen. I don't understand why.
I think I found an answer here. Namely,
onBackpressureBuffer(int capacity)
This is a bounded version that signals BufferOverflowErrorin case its buffer reaches the given capacity.
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
The relevance of this operator is decreasing as more and more operators now allow setting their buffer sizes. For the rest, this gives an opportunity to "extend their internal buffer" by having a larger number with onBackpressureBuffer than their default.
It looks like in addition to 16
passed to onBackpressureBuffer
other operators have their own buffers. And when the 17-th item is received, the preceding 16 might be buffered in different operators' buffers.