javamultithreadingobservablerx-javasubscriber

Callback on calling unsubscribe from subscription RxJava


I am trying to learn RxJava and going through some content. I came through this example where Subscriber unsubscribes soon after subscribing to observable.

    public static void main(String[] args) {
        Observable<Integer> ints = Observable.create(subscriber -> {
            Runnable r = () -> {
                sleep(10, SECONDS);
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(5);
                    subscriber.onCompleted();
                }
            };
            Thread thread = new Thread(r);
            thread.start();
            Subscription subscription = Subscriptions.create(thread::interrupt);
            System.out.println("inner subscription : "+subscription);
            subscriber.add(subscription);
        });

        Subscription subscription = ints.subscribe(x->System.out.println(x));
        System.out.println("outer subscription : "+subscription);
        subscription.unsubscribe();
    }

    static void sleep(int timeout, TimeUnit unit) {
        try {
            unit.sleep(timeout);
        } catch (InterruptedException ignored) {
            //intentionally ignored
        }
    }

As we are creatig a subscription in main method and adding it into subscriber subscription callback will interupt the thread immediately without waiting for 10 seconds.

My understanding was that subscrption which we have added should be same as the one on which unsubscribe is called but when I am printing both on console they are different.

  1. if subscription are different how calling unsubscribe on one is triggering interrupt on thread.
inner subscription : rx.subscriptions.BooleanSubscription@215be6bb

outer subscription : rx.observers.SafeSubscriber@5d5eef3d
  1. Does we have 1 subscriber with two different subscription here ?

Thanks


Solution

    1. if subscription are different how calling unsubscribe on one is triggering interrupt on thread.

    SafeSubscriber wraps the original Subscriber, in documentation you can read:

    SafeSubscriber is a wrapper around Subscriber that ensures that the Subscriber complies with the Observable contract .

    So your unsubscribe() call will eventueally call your subscription which will interrupt your thread. You can investigate it on your own in debugger, looking at the SafeSubscriber instance, in actual->subscriptions list.

    1. Does we have 1 subscriber with two different subscription here ?

    No, there is only one Subscriber here, the SafeSubscriber is just a wrapper around the original Subscriber to ensure the RxJava contract is correctly followed.

    You also should be aware that your code indicates you are following some rxjava1 guides/books, Subscription class was replaced in rxjava2 with Disposable class, over 4 (?) years ago. Currently rxjava is at version 3.