I have the following code which creates a simple Observable, emits an Integer and calls onComplete. According to https://medium.com/@ValCanBuild/making-rxjava-code-tidier-with-doonsubscribe-and-dofinally-3748f223d32d doFinally — Calls the specified action after this Observable signals onError or onCompleted or gets disposed by the downstream.
However, even after onComplete is called (I verified it is being called, check output below the code) I cannot see doFinally being called at all.
import io.reactivex.rxjava3.core.Observable;
public static void main(String args[]) {
test();
}
public static void test() {
System.out.println("Start");
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onComplete();
});
observable.doFinally(() -> System.out.println("Test do finally before"));
observable.blockingSubscribe(System.out::println, System.out::println, () -> {
System.out.println("ON Complete");
});
observable.doFinally(() -> System.out.println("Test do finally after"));
System.out.println("End");
}
Output
Start
1
ON Complete
End
AS per my understanding if it is a blocking Stream after onComplete, it should execute doFinally. Need help in this on what I might be missing.
The problem is that you didn't build one subscription, but three different observables/subscriptions instead, where only the second one (blockingSubscribe()
) is executed. As a diagram, it looks like this:
/--- doFinally ("before") (unused built observable)
/
observable --+----- blockingSubscribe() (normal subscription)
\
\--- doFinally ("after") (unused built observable)
But you want to "chain" these methods since each operator return a new Observable<T>
instances with the adjusted behavior. In your case you can write your code like this:
public static void main(String[] args) throws Exception {
System.out.println("Start");
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
});
System.out.println("Building subscription");
observable
.doFinally(() -> System.out.println("Test do finally before"))
.blockingSubscribe(System.out::println, System.out::println, () -> {
System.out.println("ON Complete");
});
System.out.println("End");
}
This will generate the following output:
Start
Building subscription
Test do finally before
1
2
3
ON Complete
End
Be careful in your interaction between doFinally()
and blockingSubscribe()
. As you see the "Test do finally before" line is execute "before" the blocking subscription, which might not want you had in mind. You can adjust the code like this to have the output "at the end":
public static void main(String[] args) throws Exception {
System.out.println("Start");
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
});
System.out.println("Building subscription");
observable
.doFinally(() -> System.out.println("Test do finally before"))
.subscribe(System.out::println, System.err::println, () -> {
System.out.println("ON Complete");
});
System.out.println("End");
}
This will generate the following output:
Start
Building subscription
1
2
3
ON Complete
Test do finally before
End