androidziprx-java2operator-keywordretrywhen

how to fetch observables in parallel, wherein only one api call has a retry logic


I want to implement a logic using RxJava in my android application, which requires three parallel api calls. Only the third api call has a retry logic. If, after having three attempts, the success is achieved then a subsequent call will be made for the fourth api, else only the result of first and second api calls will be passed on to the subscriber.

I tried to achieve this using Zip operator but then got stuck with retry logic for third api call.

    Observable<String> observable1 = Observable.just("A","B");

    Observable<Integer> observable2 = Observable.just(1,2);

    Observable<Boolean> observable3 = Observable.just(Boolean.TRUE,     Boolean.FALSE);

Observable.zip(observable1, observable2, observable3, new Function3() { @Override public Object apply(String s, Integer integer, Boolean aBoolean) throws Exception { if (aBoolean==null){ alphabets3.retry(3).doOnComplete(new Action() { @Override public void run() throws Exception { // the result will never be used } }); }

            return s+integer+aBoolean;
        }
    }).subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Object o) {
            Log.e("onNext-->", o.toString());
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

Solution

  • if any Observable failed in the Zip operator, Zip will fail the stream, the only way I know to achieve parallel execution and error handling with Zip, is to add onErrorResumeNext to each Observable, that map the error to a new model to deal with later .. and handling what you want to do in the zip mapping function ... for example

    Obsevable.zip(
    observable1.onErrorResumeNext{Observable.just(Model(it)},
    observable2.onErrorResumeNext{Observable.just(Model(it)},
    observable3.retryWhen {t is TimeOutException} //here you can add your retry logic
    .onErrorResumeNext(t -> Observable.just(Model(t)),(m1 , m2, m3) -> Result())