javaandroidkotlinrx-javarx-java2

RxJava PublishSubject buffer elements with timeout


I want to achieve the following with RxJava:

  1. Buffer elements and publish them when 5 seconds passed after the last element
  2. Publish buffered elements in 20 seconds passed after the first element

Example of elements:

[A -> 2 seconds -> B -> 3 seconds -> C -> 6 seconds -> D -> 4 seconds -> E -> 9 seconds -> F -> 1 second -> G -> 15 seconds -> H]

The result should be:

[A, B, C]

[D, E, F]

[G, H]

For now, I can publish elements after 20-second delay after the first element is produced, what should I update to achieve the first part?

fun <T> Observable<T>.buffered(): Observable<List<T>> = publish { shared ->

    val startEvent = shared.throttleFirst(20, TimeUnit.SECONDS, scheduler)

    shared.buffer(startEvent.mergeWith(startEvent.delay(20, TimeUnit.SECONDS, scheduler, false)))

}

Solution

  • For rule 1, you need debounce. For rule 2, it gets complicated.

    In the sequence, the current first event should trigger a timer of 20 seconds, after which a signal goes out to start a new buffer. However, if there is a 5 second gap from rule 1, this 20 second timer has to be cancelled.

    Here is an example that prints out the times and the buffers to illustrate the solution:

    import java.util.concurrent.*;
    import io.reactivex.rxjava3.core.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    void main(String[] args) {
        
        // The signal pattern
        var source = Observable.fromArray(1, 2, 3)
        .concatWith(Observable.range(10, 25))
        .concatWith(Observable.range(45, 4))
        .flatMap(v -> Observable.just(v).delay(v, TimeUnit.SECONDS))
        .doOnNext(v -> System.out.println("Tick - " + v));
        
        // buffering action
        source.publish(shared -> {
            var db = shared.debounce(5, TimeUnit.SECONDS)
                    .doOnNext(v -> System.out.println("Debounce 5 seconds - " + v))
                    .publish()
                    .autoConnect();
    
            var stop = new AtomicBoolean(true);
    
            var wnd = shared
                    .take(1)
                    .doOnNext(v -> stop.set(false));
                    .delay(20, TimeUnit.SECONDS)
                    .takeUntil(db.doOnNext(w -> stop.set(false))
                    .repeatUntil(() -> stop.getAndSet(true))
                    .doOnNext(v -> System.out.println("Window 20 seconds - " + v));
            
            return shared.buffer(db.mergeWith(wnd));
        })
        .blockingSubscribe(System.out::println);
    }
    

    We have two signals, db for the 5 second debounce and wnd for the 20 second window.

    In the debounce part, we debounce for 5 seconds. Because we will need the debounce signal itself for the window part, we have to publish+autoConnect that part to avoid double debounce signals from two subscriptions.

    In the window part, we take one item from the shared source, and delay it by 20 seconds. However, if the debounce part signals first, we have to cancel the sequence so it doesn't create an unwanted window. Next the repeat will make sure if the 20 second passed or the delay was cancelled, we will start back up and wait for the next source item to repeat the process.

    Now the last part with stop comes from the issue that repeating the window blindly will result in a never ending sequence. If the source completes, the repeat will right there resubscribe-complete-resubscribe in a tight loop. To avoid this issue, we need to tell the repeat when to stop trying to resubscribe.

    For this, we use a stop flag. If the source produced an item, thus started the timer, we set the stop flag to false. Otherwise we did not receive an item to begin with and need to communicate this to the repeat - repeatUntil. repeatUntil will repeat if the function returns false, i.e., the default true value if there was no item. Then reset it for the next round. If the db signals first, we want to continue with the next round.