Imagine a task that executes repeatedly, with a delay of at most 10 seconds between each execution. There is also an external signal that causes the task to execute immediately. (After the external signal, it does not matter whether the timer is reset back to 10 seconds.)
I've tried modeling this is two ways, but neither is ideal:
Use a Flux.interval(Duration.ofSeconds(10))
for timer-based execution. Use a hot Flux created from Sinks.many()
which gets fed the external signal. Merge the two fluxes, and execute the task in a concatMap
operator on the result of the merge.
The problem with this approach is that the signals can "pile up" ahead of the execution. Adding an onBackpressureLatest()
mitigates heap exhaustion, but doesn't prevent several signals from getting queued up if one execution of the task takes longer than the delay interval.
Add a delay()
followed by a repeat()
. The challenge here is that I can't figure out how to get the delay to reliably end early when the hot external trigger flux emits a value. Mono.firstWithValue
is almost what I want, but it subscribes to the external trigger flux each time, which means it's possible to miss a signal.
How can this sort of task scheduling be implemented using Reactor?
You should look into Flux.windowTimeout(int, Duration)
With windowTimeout(1, Duration.ofSeconds(10))
, the resulting outer Flux will emit inner Fluxes that end whenever one element is received, or when 10 seconds have passed, whichever comes first. Then you can attach your task to the end of the windows.
Example:
Flux<String> externalSignalFlux;
public Mono<String> task() {
return Mono.just("Task completed");
}
public Flux<String> windowTimeoutDemo() {
return externalSignalFlux
.windowTimeout(1, Duration.ofSeconds(10))
.concatMap(window -> window.then(task()));
}
If you need to process the value of the signal:
Flux<String> externalSignalFlux;
public Mono<String> task(String signal) {
return Mono.just("Task completed: " + signal);
}
public Flux<String> windowTimeoutDemo() {
return externalSignalFlux
.windowTimeout(1, Duration.ofSeconds(10))
.concatMap(window -> window.defaultIfEmpty("repeatedTask"))
.concatMap(this::task);
}
Note: While Flux.bufferTimeout(int, Duration)
looks very similar, the timeout for each window only starts when the first element is received, causing gaps in the resulting Flux.