I am trying to learn RxJava and going through some content. I came through this example where Subscriber unsubscribes soon after subscribing to observable.
public static void main(String[] args) {
Observable<Integer> ints = Observable.create(subscriber -> {
Runnable r = () -> {
sleep(10, SECONDS);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(5);
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
Subscription subscription = Subscriptions.create(thread::interrupt);
System.out.println("inner subscription : "+subscription);
subscriber.add(subscription);
});
Subscription subscription = ints.subscribe(x->System.out.println(x));
System.out.println("outer subscription : "+subscription);
subscription.unsubscribe();
}
static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException ignored) {
//intentionally ignored
}
}
As we are creatig a subscription in main method and adding it into subscriber subscription callback will interupt the thread immediately without waiting for 10 seconds.
My understanding was that subscrption which we have added should be same as the one on which unsubscribe is called but when I am printing both on console they are different.
inner subscription : rx.subscriptions.BooleanSubscription@215be6bb outer subscription : rx.observers.SafeSubscriber@5d5eef3d
Thanks
- if subscription are different how calling unsubscribe on one is triggering interrupt on thread.
SafeSubscriber
wraps the original Subscriber
, in documentation you can read:
SafeSubscriber is a wrapper around Subscriber that ensures that the Subscriber complies with the Observable contract .
So your unsubscribe()
call will eventueally call your subscription
which will interrupt your thread. You can investigate it on your own in debugger, looking at the SafeSubscriber
instance, in actual->subscriptions list.
- Does we have 1 subscriber with two different subscription here ?
No, there is only one Subscriber
here, the SafeSubscriber
is just a wrapper around the original Subscriber
to ensure the RxJava contract is correctly followed.
You also should be aware that your code indicates you are following some rxjava1 guides/books, Subscription
class was replaced in rxjava2 with Disposable
class, over 4 (?) years ago. Currently rxjava is at version 3.