I'm need to merge large data streams available over flux which each contain a timestamp and a value. If the timestamps match, then the values need to be summed up. The data in the flux is sorted by the timestamp in ascending order.
For smaller streams I would utilize the groupBy
function, but since the flux hold many entries this isn't efficient.
I would like to exploit the fact that the entries in the flux are ordered but I can't find the right construct. What are the tools to achieve something like this. Below is some sudo code on what I want to do:
var flux1 = Flux.just(
new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
);
var flux2 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
);
var flux3 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
);
var input = List.of(flux1, flux2, flux3);
var output = Flux.create(sink -> {
List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();
do {
ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
sink.next(new Data(nextTimestamp, nextOutput));
affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
} while (!allFluxAreConsumed);
});
// expected output:
// [
// Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
// Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
// Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
// Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
// ]
You can get the expected result by stacking the following operators :
reduce
on each window to merge records as you wish.Which gives something like this :
Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
.windowUntilChanged(Data::datetime)
.flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
You can test it with a unit test like so :
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.LocalDateTime;
import java.util.Comparator;
public class MergeSorted {
record Data(LocalDateTime datetime, Integer value) {}
@Test
public void test() {
var flux1 = Flux.just(
new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 1)
);
var flux2 = Flux.just(
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 2),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 2),
new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
);
var flux3 = Flux.just(
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 5)
);
var mergeSum = Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
.windowUntilChanged(Data::datetime)
.flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
StepVerifier.create(mergeSum)
.expectNext(
new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 7),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 3),
new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
)
.verifyComplete();
}
}