I am new to RxJava and if I understand correctly the Observer
is passed the Disposable
on the onSubscribe
so it can manually stop the processing if the dispose()
has already been called.
I created the following code:
@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
private Disposable d;
@Override
public void onSubscribe(@NonNull Disposable d) {
this.d = d;
}
@Override
public void onNext(@NonNull Long aLong) {
if(!d.isDisposed()) {
System.out.println("Number onNext = " + aLong);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("completed");
}
});
but I can't figure out how to call dispose()
for that subscription. subscribe
with passing Observer
as an argument returns void
and subscribeWith
does not accept my Observer
without compile errors.
How is this supposed to work? What am I misunderstanding here?
The JavaDocs of Observable
has a straightforward example:
Disposable d = Observable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<String>() {
@Override public void onStart() {
System.out.println("Start!");
}
@Override public void onNext(String t) {
System.out.println(t);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence can now be disposed via dispose()
d.dispose();
Edit
The following examples are ways to get the Disposable
out of the onSubscribe
method but are generally not recommended:
// field in the owner class
Disposable disposable;
public void doReactive() {
Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
// ...
});
}
public void cleanup() {
if (disposable != null) {
disposable.dispose();
disposable = null;
}
}
or
SerialDisposable sd = new SerialDisposable();
Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
sd.set(d);
}
// ...
});
// ...
sd.dispose();