javarx-javarx-java3

Calling dispose() when passing a subscriber that overrides onSubscribe


I am new to RxJava and if I understand correctly the Observer is passed the Disposable on the onSubscribe so it can manually stop the processing if the dispose() has already been called.
I created the following code:

@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
      private Disposable d;

      @Override
      public void onSubscribe(@NonNull Disposable d) {
           this.d = d;
      }

      @Override
      public void onNext(@NonNull Long aLong) {
           if(!d.isDisposed()) {
              System.out.println("Number onNext = " + aLong);
           }
      }

       @Override
       public void onError(@NonNull Throwable e) {

       }

       @Override
       public void onComplete() {
           System.out.println("completed");
       }
 });

but I can't figure out how to call dispose() for that subscription. subscribe with passing Observer as an argument returns void and subscribeWith does not accept my Observer without compile errors.

How is this supposed to work? What am I misunderstanding here?


Solution

  • The JavaDocs of Observable has a straightforward example:

    Disposable d = Observable.just("Hello world!")
         .delay(1, TimeUnit.SECONDS)
         .subscribeWith(new DisposableObserver<String>() {
             @Override public void onStart() {
                 System.out.println("Start!");
             }
             @Override public void onNext(String t) {
                 System.out.println(t);
             }
             @Override public void onError(Throwable t) {
                 t.printStackTrace();
             }
             @Override public void onComplete() {
                 System.out.println("Done!");
             }
         });
    
     Thread.sleep(500);
     // the sequence can now be disposed via dispose()
     d.dispose();
    

    Edit

    The following examples are ways to get the Disposable out of the onSubscribe method but are generally not recommended:

    // field in the owner class
    Disposable disposable;
    
    public void doReactive() {
        Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
        src.subscribe(new Observer<Long>() {
    
            @Override
            public void onSubscribe(@NonNull Disposable d) {
               disposable = d;
            }
    
            // ...
        });
    }
    
    public void cleanup() {
       if (disposable != null) {
           disposable.dispose();
           disposable = null;
       }
    }
    

    or

    SerialDisposable sd = new SerialDisposable();
    
    Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
        src.subscribe(new Observer<Long>() {
    
            @Override
            public void onSubscribe(@NonNull Disposable d) {
               sd.set(d);
            }
    
            // ...
        });
    
    // ...
    
    sd.dispose();