rx-javabackpressurerx-java3

Unbounded backpressure in Flowable rxjava3


Reactive Streams is built around back pressure what is great and I would like to better understand how to use RxJava3 API. The approach to drop overflow data for GUI events is perfectly fine but if I process file / Kafka topic then usually it is capped by speed of inserting data into Database and want to ensure I use proper RxJava patterns to not loose data and slowdown producers.

I posted a question RxJava group and was asked to post it here.

What is the reason that Flowable#subscribe does not have an version to specify request size for the subscription but all use FlowableInternalHelper.RequestMax.INSTANCE.

What is an background that default subscription size is not correlated with the default buffer size (which is 128 - so subscription could be by default 64)?

Am I reading API right that to apply backpressure and not loose data / fail with error I need to use subscribeWith(new DisposableSubscriber..) like below and there is not shortcut in the API for that?

.subscribeWith(
    new DisposableSubscriber<List<Buffer>>() {
      @Override
      public void onStart() {
        request(1);
      }

      @Override
      public void onNext(List<Buffer> buffers) {
        // ...
        request(1);
      }

      @Override
      public void onError(Throwable throwable) {
        // ...
      }

      @Override
      public void onComplete() {
        // ...
      }
    });

// or similar example with vertx promise
.subscribeWith(new RequestSub<>(promise::fail, promise::complete));

@RequiredArgsConstructor
public class RequestSub<T> extends DisposableSubscriber<T> {

  private final Consumer<? super Throwable> onError;
  private final Runnable onComplete;

  @Override
  public void onStart() {
    request(1);
  }

  @Override
  public void onNext(T next) {
    request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    onError.accept(throwable);
  }

  @Override
  public void onComplete() {
    onComplete.run();
  }
}

I wrote small a rxjava-agent to let me catch easier situation when unbounded subscriptions are used - API is documented but it is easier for me to see what actually happens.


Solution

  • There is no shortcut because there is no good request pattern to default to. Consequently, we didn't want to bloat the API surface even further.

    If you need customization, use the approach you already discovered by implementing Subscriber or extending one of the built-in ones.

    Also unless you do async work in your subscriber, the request pattern has no benefit over unbounded since it always consumes its upstream synchronously. I.e., if you request 1 upfront, then 1 every time in onNext, the upstream will see it can continue immediately with the next item if available to it.