javaandroidrx-javarx-java2rx-android

rxjava chain observalbes dynamiclly


I have a chained observable that I created like this:

Disposable disposable = currentUsedAdapter.connect(ip)
        .observeOn(AndroidSchedulers.mainThread())
        .concatMap(fallbackAdapter(ProtocolType.V2))
        .delay(500, TimeUnit.MILLISECONDS)
        .concatMap(fallbackAdapter(ProtocolType.V1))
        .subscribeWith(connectionSubscriber);

and this is the method fallbackAdapter:

private Function<Boolean, Observable<Boolean>> fallbackAdapter(ProtocolType protocolType) {
    return new Function<Boolean, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> apply(@NonNull Boolean isConnected) throws Exception {
            if (isConnected) {
                return Observable.just(true);
            } else {
                TempAdapter adapter = new TempAdapter(context, protocolType);
                return currentUsedAdapter.connect(ipAddress);
            }
        }
    };
}

currently this is done staticlly, and it's working fine. Though I want to create a list of those fallbackAdapter(ProtocolType.*) because I only know the amount of fallbacks during runtime.

So I created this:

ArrayList<Function<Boolean, Observable<Boolean>>> adaptersList = new ArrayList<>();
adaptersList.add(fallbackAdapter(ProtocolType.V2));
...
adaptersList.add(fallbackAdapter(ProtocolType.V9));

Disposable disposable = Observable.fromIterable(adaptersList)
        .concatMap(adapter ->
                adapter.apply(true))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(connectionSubscriber);

I have created a list that I can update dynamiclly. BUT, I am not sure how I can pass the value of isConnected from one adapter to another. I currently pass true to everyone, but some of them should return false, but I'm not sure how I can pass this value from one emitter to another using the Observable.fromIterable.

So my question is how should I change this .concatMap(adapter -> adapter.apply(true)) so that I won't always send true but rather I will send the value that been processed by the previous adapter?

thank you


Solution

  • If it helps anyone... I didn't find an rxjava way to solve it so I solved in an on old java fashion way... I have created a builder class and added an observable to my main observable and at the end I returned everything. something like that:

    public class DisposableBuilder {
        Observable<Boolean> observable;
    
        public DisposableBuilder() {
        }
    
        public void build(String ip) {
            observable = currentUsedAdapter.connect(host);
            if (adaptersNames != null) {
                for (int i = 1; i < adaptersNames.size(); i++) { // skip first adapter (currentUsedAdapter adapter)
                    this.append(AdapterFactory.getAdapter(context, adaptersNames.get(i)));
                }
            }
        }
    
        public void append(CustomAdapter adapter) {
            observable = observable
                    .delay(200, TimeUnit.MILLISECONDS)
                    .concatMap(fallbackAdapter(adapter));
        }
    
        public Observable<Boolean> getObservable() {
            return observable;
        }
    }
    

    and then I used it like so:

    disposableBuilder.build(ip);
    this.disposable = disposableBuilder.getObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(connectionSubscriber);