javaproject-reactor

Process events with throttling and timeout


There are different kinds of events in my application. Let's call them for simplicity a-event, b-events, c-events and so on. I need to implement throttling for each kind of events separately:

    @Test
    void test() throws InterruptedException {
        var sink = Sinks.many().unicast().<String>onBackpressureBuffer();

        sink.asFlux()
                .groupBy(e -> e.charAt(0))
                .flatMap(g -> g
                        .doOnNext(e -> System.out.println("got " + e))
                        .timeout(Duration.ofMillis(150), Mono.defer(() -> {
                            System.out.println("timeout");
                            return Mono.empty();
                        }))
                        .sample(Duration.ofMillis(100))
                        .doOnNext(e -> System.out.println("processed " + e)))
                .subscribe();

        sink.tryEmitNext("a1");
        Thread.sleep(150);
        sink.tryEmitNext("a2");

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

And also I need to close a queue for each kind of messages after a timeount so it doesn't consume any resources.

It works in general. If delay between a1 and a2 is 50 ms, then I get:

got a1
got a2
processed a2
timeout

a1 is not processed because of throttling.

If delay is 250 ms it works fine as well:

got a1
processed a1
timeout
got a2
processed a2
timeout

The message queue for a-events is cleared after timeout, created the new one and a2 is processed.

The problem is when delay is 150 ms. Sometimes it receives a2 but doesn't process it:

got a1
processed a1
got a2
timeout

How to make this schema more stable. So it doesn't loose last event because of timeout?


Solution

  • The solution is to store last event and return it in timeout fallback:

            sink.asFlux()
                    .groupBy(e -> e.charAt(0))
                    .flatMap(g -> {
                        var last = new AtomicReference<String>();
                        return g
                                .doOnNext(e -> {
                                    System.out.println("got " + e);
                                    last.set(e);
                                })
                                .sample(Duration.ofMillis(100))
                                .timeout(Duration.ofMillis(150), Mono.defer(() -> {
                                    System.out.println("timeout");
                                    return Mono.justOrEmpty(last.get());
                                }))
                                .doOnNext(e -> {
                                    last.set(null);
                                    System.out.println("processed " + e);
                                });
                    })
                    .subscribe();
    

    last.set(null); guarantees that the last event will not be processed twice.

    Also timeout should be after sample. Otherwise a2 event can be skipped at all.

    The output is as follows, so a2 processed even after timeout:

    got a1
    processed a1
    got a2
    timeout
    processed a2