javaandroidrx-javabehaviorsubjectpublishsubject

Why onSubscribe does not work in rxjava?


when I run below code if I don't write observeOn line, app crashes because getView().showBlockLayout(isBlock); invoke a method that try to hide or show a layout. but I tried to change below observeOn(AndroidSchedulers.mainThread()) to subscribeOn(AndroidSchedulers.mainThread()) and app crashes again!

subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean isBlock) {
                    getView().showBlockLayout(isBlock);
                    databaseHelper.getConference().setBlock(isBlock);
                    mConferenceModel.setBlock(isBlock);
                }
            }));

I also test this:

subscription.add(UserStore.getInstance().getBlockObservable(databaseHelper.getConference().getUserChatId())
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean isBlock) {
                    getView().showBlockLayout(isBlock);
                    databaseHelper.getConference().setBlock(isBlock);
                    mConferenceModel.setBlock(isBlock);
                }
            }));

and unexpectedly it worked and did not crash! I didn't use subscribeOn in getBlockObservable method(because I know we can set it once)

it's my UserStore class

PublishSubject<Pair<String,Boolean>> mObservableBlock;

private UserStore(){
    mObservableBlock = PublishSubject.create();
    mInstance = this;
}

public static UserStore getInstance() {
    if(mInstance == null)
        new UserStore();
    return mInstance;
}

public Observable<Boolean> getBlockObservable(final String userId){
    return mObservableBlock
            .observeOn(Schedulers.computation())
            .filter(new Func1<Pair<String,Boolean>, Boolean>() {
        @Override
        public Boolean call(Pair<String,Boolean> s) {
            if(userId.equals(s.first))
                return true;
            return false;
        }
    }).map(new Func1< Pair<String, Boolean>, Boolean>() {

        @Override
        public Boolean call(Pair<String, Boolean> UserBlock) {
            return UserBlock.second;
        }
    });
}
public void publishBlockedUser(String userId,boolean isBlock){
    mObservableBlock.onNext(new Pair<String, Boolean>(userId,isBlock));
}

and here is how I imported rxjava dependency in gradle

compile 'io.reactivex:rxjava:1.1.5'
compile 'io.reactivex:rxandroid:1.2.0'

Solution

  • As mentioned in this medium artice:

    One important fact is that subscribeOn does not work with Subjects.

    So you can't use subscribeOn with subjects and we have to use observerOn(AndroidSchedulers.mainThread()) before subscription. so all downstream methods are called on mainThread after that.

    check this medium artice