androidrx-java2

How to cancel the Flowable upstream's block logic when a TimeoutException throws


I have a rxjava2 code, like below:

public class Demo {

    /** @noinspection ResultOfMethodCallIgnored*/
    @SuppressLint("CheckResult")
    public static void test() {
        Flowable.just(false).map(aBoolean -> block())
                .retryWhen(throwableFlowable -> {
                    AtomicInteger retryCounter = new AtomicInteger();
                    return throwableFlowable.takeWhile(throwable -> {
                        if (retryCounter.getAndIncrement() < 5) {
                            return true;
                        }

                        Exception exception;
                        try {
                            exception = (Exception) throwable;
                        } catch (Exception e) {
                            exception = new Exception(throwable);
                        }
                        throw exception;
                    });
                }).timeout(1000, TimeUnit.MILLISECONDS).subscribe(aBoolean -> {
                    Log.e("Demo", "onNext");
                }, throwable -> {
                    Log.e("Demo", "onError", throwable);
                }, () -> {
                    Log.e("Demo", "onComplete");
                }, subscription -> {
                    subscription.request(Long.MAX_VALUE);
                });
    }

    private static boolean block() {
        Log.e("Demo", "block");
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            // do nothing
        }
        Log.e("Demo", "block end");
        return true;
    }

}

When a TimeoutException throws, the block() method code still running, and the "block end" log will print after a while.

I have no idea. I expect that I could cancel the Flowable upstream's block logic when a TimeoutException throws. And even more I could make block method stop running! What can I try next?


Solution

  • Since the block() runs on the test thread and RxJava has no handle on it, there is no way in your setup a cancellation would unblock the chain.

    To interrupt a blocking call in RxJava, you have to make sure it runs on a Scheduler. Thus, apply subscribeOn(Schedulers.io()).

    However, the code would then fall off the test method so it is necessary to block on the result of the chain now:

    Flowable.just(false)
    .subscribeOn(Schedulers.io()) // <--------------------------------
    .map(aBoolean -> block())
    .retryWhen(throwableFlowable -> {
         AtomicInteger retryCounter = new AtomicInteger();
         return throwableFlowable.takeWhile(throwable -> {
             if (retryCounter.getAndIncrement() < 5) {
                 return true;
             }
    
             Exception exception;
             try {
                 exception = (Exception) throwable;
             } catch (Exception e) {
                 exception = new Exception(throwable);
             }
             throw exception;
         });
    })
    .timeout(1000, TimeUnit.MILLISECONDS)
    .blockingSubscribe( // <------------------------------------------
                    aBoolean -> {
                        Log.e("Demo", "onNext");
                    }, throwable -> {
                        Log.e("Demo", "onError", throwable);
                    }, () -> {
                        Log.e("Demo", "onComplete");
                    });