There are different kinds of events in my application. Let's call them for simplicity a-event, b-events, c-events and so on. I need to implement throttling for each kind of events separately:
@Test
void test() throws InterruptedException {
var sink = Sinks.many().unicast().<String>onBackpressureBuffer();
sink.asFlux()
.groupBy(e -> e.charAt(0))
.flatMap(g -> g
.doOnNext(e -> System.out.println("got " + e))
.timeout(Duration.ofMillis(150), Mono.defer(() -> {
System.out.println("timeout");
return Mono.empty();
}))
.sample(Duration.ofMillis(100))
.doOnNext(e -> System.out.println("processed " + e)))
.subscribe();
sink.tryEmitNext("a1");
Thread.sleep(150);
sink.tryEmitNext("a2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
And also I need to close a queue for each kind of messages after a timeount so it doesn't consume any resources.
It works in general. If delay between a1 and a2 is 50 ms, then I get:
got a1
got a2
processed a2
timeout
a1 is not processed because of throttling.
If delay is 250 ms it works fine as well:
got a1
processed a1
timeout
got a2
processed a2
timeout
The message queue for a-events is cleared after timeout, created the new one and a2 is processed.
The problem is when delay is 150 ms. Sometimes it receives a2 but doesn't process it:
got a1
processed a1
got a2
timeout
How to make this schema more stable. So it doesn't loose last event because of timeout?
The solution is to store last event and return it in timeout fallback:
sink.asFlux()
.groupBy(e -> e.charAt(0))
.flatMap(g -> {
var last = new AtomicReference<String>();
return g
.doOnNext(e -> {
System.out.println("got " + e);
last.set(e);
})
.sample(Duration.ofMillis(100))
.timeout(Duration.ofMillis(150), Mono.defer(() -> {
System.out.println("timeout");
return Mono.justOrEmpty(last.get());
}))
.doOnNext(e -> {
last.set(null);
System.out.println("processed " + e);
});
})
.subscribe();
last.set(null); guarantees that the last event will not be processed twice.
Also timeout should be after sample. Otherwise a2 event can be skipped at all.
The output is as follows, so a2 processed even after timeout:
got a1
processed a1
got a2
timeout
processed a2