I have a rxjava2 code, like below:
public class Demo {
/** @noinspection ResultOfMethodCallIgnored*/
@SuppressLint("CheckResult")
public static void test() {
Flowable.just(false).map(aBoolean -> block())
.retryWhen(throwableFlowable -> {
AtomicInteger retryCounter = new AtomicInteger();
return throwableFlowable.takeWhile(throwable -> {
if (retryCounter.getAndIncrement() < 5) {
return true;
}
Exception exception;
try {
exception = (Exception) throwable;
} catch (Exception e) {
exception = new Exception(throwable);
}
throw exception;
});
}).timeout(1000, TimeUnit.MILLISECONDS).subscribe(aBoolean -> {
Log.e("Demo", "onNext");
}, throwable -> {
Log.e("Demo", "onError", throwable);
}, () -> {
Log.e("Demo", "onComplete");
}, subscription -> {
subscription.request(Long.MAX_VALUE);
});
}
private static boolean block() {
Log.e("Demo", "block");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
// do nothing
}
Log.e("Demo", "block end");
return true;
}
}
When a TimeoutException
throws, the block()
method code still running, and the "block end" log will print after a while.
I have no idea. I expect that I could cancel the Flowable upstream's block logic when a TimeoutException throws. And even more I could make block
method stop running! What can I try next?
Since the block() runs on the test thread and RxJava has no handle on it, there is no way in your setup a cancellation would unblock the chain.
To interrupt a blocking call in RxJava, you have to make sure it runs on a Scheduler. Thus, apply subscribeOn(Schedulers.io())
.
However, the code would then fall off the test method so it is necessary to block on the result of the chain now:
Flowable.just(false)
.subscribeOn(Schedulers.io()) // <--------------------------------
.map(aBoolean -> block())
.retryWhen(throwableFlowable -> {
AtomicInteger retryCounter = new AtomicInteger();
return throwableFlowable.takeWhile(throwable -> {
if (retryCounter.getAndIncrement() < 5) {
return true;
}
Exception exception;
try {
exception = (Exception) throwable;
} catch (Exception e) {
exception = new Exception(throwable);
}
throw exception;
});
})
.timeout(1000, TimeUnit.MILLISECONDS)
.blockingSubscribe( // <------------------------------------------
aBoolean -> {
Log.e("Demo", "onNext");
}, throwable -> {
Log.e("Demo", "onError", throwable);
}, () -> {
Log.e("Demo", "onComplete");
});