javarx-javarx-java2

retryWhen hanging after migrating from rxjava to rxjava2


We have the following methods which we have recently migrated from rx-java to rx-java2:

@Override
public Observable<FooResponse> doFoo(String id) {
    return api.doFoo(id)
       .map(Response::body)
       .retryWhen(errors -> shouldRetry(
               errors
           )
       )
       .switchIfEmpty(
           Observable.error(new FooTimeoutException(id)));
}

private Observable<Long> shouldRetry(Observable<? extends Throwable> errors) {
    return errors.filter(e -> e instanceof ClientException)
        .cast(ClientException.class)
        .map(ClientException::getStatusCode)
        .flatMap(statusCode -> statusCode == 1 || statusCode == 2
            ? Observable.just(statusCode)
            : errors.flatMap(Observable::error)
       )
       .zipWith(Observable.range(1, 3), (n, i) -> i)
       .doOnNext(retryCount -> log.info("Retrying {}", retryCount))
       .flatMap(retryCount -> Observable.timer(500, TimeUnit.MILLISECONDS));
}

This method should perform a retry when api.doFoo returns an exception of type ClientException with status code 1 or 2.

This worked fine in rx-java, but now that we're migrating to rx-java2, we're seeing that our unit tests hangs when the status code is not 1 or 2.

Has the syntax around this functionality changed between 1 and 2? How should this be structured?


Solution

  • I've resolved this issue with a bit of a refactor. My solution now looks like this:

    public Observable<FooResponse> doFoo(String id) {
        return api.doFoo(id)
           .map(Response::body)
           .retryWhen(errors -> shouldRetry(
                   errors
               )
           )
           .switchIfEmpty(
               Observable.error(new FooTimeoutException(id)));
    }
    
    private Observable<Long> shouldRetry(Observable<? extends Throwable> errors) {
        return errors.flatMap(this::retryableOrError)
           .zipWith(Observable.range(1, 3), (n, i) -> i)
           .doOnNext(retryCount -> log.info("Retrying {}", retryCount))
           .flatMap(retryCount -> Observable.timer(500, TimeUnit.MILLISECONDS));
    }
    
    private Observable<Throwable> retryableOrError(Throwable e) {
        if (e instanceof ClientException clientException && (clientException.getStatusCode() == 1 || clientException.getStatusCode() == 2)) {
            return Observable.just(e);
        } else {
            return Observable.error(e);
        }
    }
    

    I'm not totally sure why this works, as the only real difference is that we're not filtering the errors out, but it seems to have done the job.