rx-java

Throw exception out of RxJava's scope


I am using a library that exposes an RxJava2 API. But my code is not using RxJava (it's actually Kotlin). So I would like to be able to throw exceptions from RxJava to the "outside world", so that my code can catch it.

So I want to do something like this:

fun myFun() {
    someCompletable
        .doOnError { throw it }
        .subscribe()
}

And have the user of myFun() not know about RxJava at all. The thing is, when RxJava throws an exception, it comes wrapped in a OnErrorNotImplementedException, and I don't want that:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | [here is the actual exception]

Is there a way to have myFun() throw the actual exception instead of OnErrorNotImplementedException?


Explaining why the proposed duplicates are irrelevant:


Solution

  • One way is to call .blockingAwait():

    If the source signals an error, the operator wraps a checked Exception into RuntimeException and throws that. Otherwise, RuntimeExceptions and Errors are rethrown as they are.

    With Kotlin and coroutines, RxJava can be converted to a coroutine with suspendCoroutine or suspendCancellableCoroutine, like so:

    suspendCancellableCoroutine { cont ->
        val disposable = someCompletable
            .subscribe({ cont.resume(Unit) }, { e -> cont.resumeWithException(e) })
        cont.invokeOnCancellation { disposable.dispose() }
    }