javaandroidrx-javareactive-programmingrx-android

How can I make this rxjava zip to run in parallel?


I have a sleep method for simulating a long running process.

private void sleep() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Then I have a method returns an Observable containing a list of 2 strings that is given in the parameters. It calls the sleep before return the strings back.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            sleep();
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}

Then I am calling the getStrings three times in Observalb.zip, I expect those three calls to run in parallel, so the total time of execution should be within 2 seconds or maybe 3 seconds the most because the sleep was only 2 seconds. However, it's taking a total of six seconds. How can I make this to run in parallel so it will finish within 2 seconds?

Observable
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(List<String> strings) {
        //Display the strings
    }
});

The mergeStringLists method

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Func3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d(TAG, "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}

Solution

  • That's happening because subscribing to your zipped observable happens in the the same, io thread.

    Why don't you try this instead:

    Observable
        .zip(
            getStrings("One", "Two")
                .subscribeOn(Schedulers.newThread()),
            getStrings("Three", "Four")
                .subscribeOn(Schedulers.newThread()),
            getStrings("Five", "Six")
                .subscribeOn(Schedulers.newThread()),
            mergeStringLists())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<String>>() {
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onNext(List<String> strings) {
                //Display the strings
            }
        });
    

    Let me know if that helped