I have a problem to understand a chained "RXJava-Retrofit" API call. I got inspired by this and implement this class named ObservationLoader
to load the data from the API bucket per bucket. When the end of data is reached the API sends a endOfRecords=true
:
public Observable<PageObject<Observation>> getAllObservationDataByRegion(long taxonKey,
String regionId) {
final PublishSubject<PageObject<Observation>> subject = PublishSubject.create();
return subject.doOnSubscribe(disposable -> {
this.getData(taxonKey, regionId, 0).subscribe(subject);
})
.doOnNext(observationPageObject -> {
if (observationPageObject.isEndOfRecords()) {
// -> list is completely loaded
subject.onComplete();
} else {
int nextOffset = observationPageObject.getOffset() + 1;
this.getData(taxonKey, regionId, null, nextOffset).subscribe(subject);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
private Observable<PageObject<Observation>> getData(long id,
String regionId,
int offset) {
// Get your API response value
return this.api.getObservations(id, regionId, ObservationLoader.PAGE_LIMIT, offset);
}
In my Android fragment HomeFragment
I subscribe to the ObservationLoader
:
ObservationLoader loader = new ObservationLoader(this.getApi());
Observable<PageObject<Observation>> observable = loader
.getAllObservationDataByRegion(this.getSelectedSpecies(), this.getSelectedRegion());
observable.subscribe(new Observer<PageObject<Observation>>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "ON_SUBSCRIBE");
}
@Override
public void onNext(PageObject<Observation> observationPageObject) {
Log.i(TAG, "ON_NEXT");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "ERROR = " + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "COMPLETED");
}
});
I can see that the onSubscribe()
and doOnSubscribe()
are called and even the getData()
is reached. I assume the API is responding correctly (a previous attempt attempt with recursion worked fine). But I never reached the doOnNext
function. The observer goes straight to onComplete()
and no data is received. What could be the reason?
When doOnSubscribe
runs, the doesn't see any consumers yet so if getData
is synchronous, there won't be any first results to trigger further results. Also if getData
ends, it will complete the setup so the next getData
call in doOnNext
will push to an already terminated subject, ingoring all data.
You'll need a differently organized feedback loop:
// we loop back the nextOffset, in a thread-safe manner
Subject<Integer> subject = PublishSubject.<Integer>create()
.toSerialized();
// bootstrap with 0 and keep open for more offsets
subject.mergeWith(Observable.just(0))
// get the data for the current offset
.concatMap(nextOffset -> getData(taxonKey, regionId, nextOffset)
.subscribeOn(Schedulers.io())
)
// if the response is end of records, stop
.takeWhile(observationPageObject -> !observationPageObject.isEndOfRecords())
// otherwise not end of records, feedback the new offset
.doOnNext(observationPageObject ->
subject.onNext(observationPageObject.getOffset() + 1)
)
// get the data on the main thread
.observeOn(AndroidSchedulers.mainThread());