rx-java2

How to sequentially execute a "Completable" followed by "Single"?


I have two reactive methods in "Layer A" that's exposed to the app:

LAYER A - publicly accessible methods to the rest of the app

public Single<ResponseData> postMyData(ReqData data, Long id) {
    if (!isUserLoggedIn()) return postLogin().andThen(postData(data, id));
    return postData(data, id);
}

public Completable postLogin() {
    Account account = getAccountData();
    ReqLoginData loginData = new ReqLoginData(account.email, account.pass, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET);

    Single<ResponseLogin> singlePostLogin = postLogin(loginData);

    return Completable.create(subscriber -> singlePostLogin.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(loginResponse -> {
                        // Success
                        storeAccessToken(loginResponse);// *** this will recreate apiClient with new access token
                        if (!subscriber.isDisposed()) subscriber.onComplete();
                    }, throwable -> { if (!subscriber.isDisposed()) subscriber.onError(throwable);}
            ));
}

LAYER B - only accessible by Layer A

public Single<ResponseLogin> postLogin(ReqLoginData loginData) {
    Log.d(TAG, "using apiClient "+apiClient.toString());
    Single<Response<ResponseLogin>> postLogin = apiClient.postLogin(loginData);

    return Single.create(subscriber -> postLogin.subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(response -> {
                if(response.body() != null) {
                    if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
                } else {
                    if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                }
            }, throwable -> {
                throwable.printStackTrace();
                if(!subscriber.isDisposed()) subscriber.onError(throwable);
            }));
}

public Single<ResponseData> postData(ReqData data, Long id) {
    Log.d(TAG, "using apiClient "+apiClient.toString());
    Single<Response<ResponseData>> postData = apiClient.postData(id, data);

    return Single.create(subscriber -> postData.subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(response -> {
                if(response.body() != null) {
                    if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
                } else {
                    if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                }
            }, throwable -> {
                throwable.printStackTrace();
                if(!subscriber.isDisposed()) subscriber.onError(throwable);
            }));
}

LAYER C - API Interface

@Headers({"Content-Type: application/json"})
@POST(Constants.API_PREFIX + "/auth/token/")
Single<Response<ResponseLogin>> postLogin(@Body ReqLoginData reqLoginData);

@Headers({"Content-Type: application/json"})
@POST(Constants.API_PREFIX + "/data/{id}/")
Single<Response<ResponseData>> postData(@Path("id") Long id, @Body ReqData reqData);

A huge challenge for me at the moment is to make sure the user is logged-in before attempting to POST data. So, using postLogin().andThen(postData(data, id)) made sense to me :

At the time of my late-night development, it seemed that both methods postLogin() and postData() are called in parallel - with postLogin() first.

How can I make postMyData() execute postLogin() first, wait for success or failure, followed by postData() on success ?

Note: postLogin() will update apiClient with the new token inside storeAccessToken(). I now see that postData() doesn't use a reference to the new apiClient. It's important that postData() will use the new apiClient that's instantiated within storeAccessToken()

Sorry for the newbie question :-(


Solution

  • it turned out that Completable.andThen(Single) works very well to serialize code execution. Unfortunately, I didn't (half asleep) realize that only code within the create() method was actually scheduled.

    my code below answers the problem I had (these are both in LAYER-B):

    public Single<ResponseLogin> postLogin(ReqLoginData loginData) {
    
        return Single.create(subscriber -> {
            Log.d(TAG, "using apiClient "+apiClient.toString());
            apiClient.postLogin(loginData)
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .subscribe(response -> {
                        if(response.isSuccessful() && response.body() != null) {
                            if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
                        } else {
                            if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                        }
                    }, throwable -> {
                        throwable.printStackTrace();
                        if(!subscriber.isDisposed()) subscriber.onError(throwable);
                    });
        });
    }
    
    
    public Single<ResponseData> postData(ReqData data, Long id) {
    
        return Single.create(subscriber -> {
            Log.d(TAG, "using apiClient "+apiClient.toString());
            apiClient.postData(id, data)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(response -> {
                    if(response.body() != null) {
                        if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
                    } else {
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                    }
                }, throwable -> {
                    throwable.printStackTrace();
                    if(!subscriber.isDisposed()) subscriber.onError(throwable);
                });
        });
    }