javajava-8rx-javaexponential-backoff

Exponential Backoff in RxJava


I have an API that takes an Observable that triggers an event.

I want to return an Observable that emits a value every defaultDelay seconds if an Internet connection is detected, and delays numberOfFailedAttempts^2 times if there's no connection.

I've tried a bunch of various styles, the biggest problem I'm having is retryWhen's observable is only evaluated once:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

Is there any way to do what I'm attempting to do? I found a related question (can't find it searching right now), but the approach taken didn't seem to work with a dynamic value.


Solution

  • In your code there are two mistakes:

    1. In order to repeat some observable sequence, that sequence has to be finite. I.e. instead of interval you'd better use something like just, or fromCallable as I did in sample below.
    2. From repeatWhen's inner function you need to return new delayed observable source, so instead of observable.delay() you have to return Observable.timer().

    Working code:

    public void testRepeat() throws InterruptedException {
        logger.info("test start");
    
        int DEFAULT_DELAY = 100; // ms
        int ADDITIONAL_DELAY = 100; // ms
        AtomicInteger generator = new AtomicInteger(0);
        AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
    
        Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
                .repeatWhen(counts -> {
                    AtomicInteger retryCounter = new AtomicInteger(0);
                    return counts.flatMap(c -> {
                        int retry = 0;
                        if (connectionAlive.get()) {
                            retryCounter.set(0); // reset counter
                        } else {
                            retry = retryCounter.incrementAndGet();
                        }
                        int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                        logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                        return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                    });
                })
                .subscribe(v -> logger.info("got {}", v));
    
        Thread.sleep(220);
        logger.info("connection dropped");
        connectionAlive.set(false);
        Thread.sleep(2000);
        logger.info("connection is back alive");
        connectionAlive.set(true);
        Thread.sleep(2000);
        subscription.dispose();
        logger.info("test complete");
    }
    

    See detailed article about repeatWhen here.