I have two reactive methods in "Layer A" that's exposed to the app:
LAYER A - publicly accessible methods to the rest of the app
public Single<ResponseData> postMyData(ReqData data, Long id) {
if (!isUserLoggedIn()) return postLogin().andThen(postData(data, id));
return postData(data, id);
}
public Completable postLogin() {
Account account = getAccountData();
ReqLoginData loginData = new ReqLoginData(account.email, account.pass, OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET);
Single<ResponseLogin> singlePostLogin = postLogin(loginData);
return Completable.create(subscriber -> singlePostLogin.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(loginResponse -> {
// Success
storeAccessToken(loginResponse);// *** this will recreate apiClient with new access token
if (!subscriber.isDisposed()) subscriber.onComplete();
}, throwable -> { if (!subscriber.isDisposed()) subscriber.onError(throwable);}
));
}
LAYER B - only accessible by Layer A
public Single<ResponseLogin> postLogin(ReqLoginData loginData) {
Log.d(TAG, "using apiClient "+apiClient.toString());
Single<Response<ResponseLogin>> postLogin = apiClient.postLogin(loginData);
return Single.create(subscriber -> postLogin.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
if(response.body() != null) {
if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
}));
}
public Single<ResponseData> postData(ReqData data, Long id) {
Log.d(TAG, "using apiClient "+apiClient.toString());
Single<Response<ResponseData>> postData = apiClient.postData(id, data);
return Single.create(subscriber -> postData.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
if(response.body() != null) {
if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
}));
}
LAYER C - API Interface
@Headers({"Content-Type: application/json"})
@POST(Constants.API_PREFIX + "/auth/token/")
Single<Response<ResponseLogin>> postLogin(@Body ReqLoginData reqLoginData);
@Headers({"Content-Type: application/json"})
@POST(Constants.API_PREFIX + "/data/{id}/")
Single<Response<ResponseData>> postData(@Path("id") Long id, @Body ReqData reqData);
A huge challenge for me at the moment is to make sure the user is logged-in before attempting to POST data. So, using postLogin().andThen(postData(data, id))
made sense to me :
At the time of my late-night development, it seemed that both methods postLogin()
and postData()
are called in parallel - with postLogin()
first.
How can I make postMyData()
execute postLogin()
first, wait for success or failure, followed by postData()
on success ?
Note: postLogin() will update apiClient
with the new token inside storeAccessToken(). I now see that postData() doesn't use a reference to the new apiClient
. It's important that postData() will use the new apiClient
that's instantiated within storeAccessToken()
Sorry for the newbie question :-(
it turned out that Completable.andThen(Single)
works very well to serialize code execution. Unfortunately, I didn't (half asleep) realize that only code within the create()
method was actually scheduled.
my code below answers the problem I had (these are both in LAYER-B):
public Single<ResponseLogin> postLogin(ReqLoginData loginData) {
return Single.create(subscriber -> {
Log.d(TAG, "using apiClient "+apiClient.toString());
apiClient.postLogin(loginData)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
if(response.isSuccessful() && response.body() != null) {
if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
});
});
}
public Single<ResponseData> postData(ReqData data, Long id) {
return Single.create(subscriber -> {
Log.d(TAG, "using apiClient "+apiClient.toString());
apiClient.postData(id, data)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
if(response.body() != null) {
if (!subscriber.isDisposed()) subscriber.onSuccess(response.body());
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
});
});
}