javaspring-webfluxflux

Join sorted Flux Producers efficiently


I'm need to merge large data streams available over flux which each contain a timestamp and a value. If the timestamps match, then the values need to be summed up. The data in the flux is sorted by the timestamp in ascending order.

For smaller streams I would utilize the groupBy function, but since the flux hold many entries this isn't efficient.

I would like to exploit the fact that the entries in the flux are ordered but I can't find the right construct. What are the tools to achieve something like this. Below is some sudo code on what I want to do:

    var flux1 = Flux.just(
            new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
    );


    var flux2 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
    );

    var flux3 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
    );

    var input = List.of(flux1, flux2, flux3);


    var output = Flux.create(sink -> {
        List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();

        do {
            ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
            List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
            double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
            sink.next(new Data(nextTimestamp, nextOutput));
            affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
        } while (!allFluxAreConsumed);
    });

    // expected output:
    // [
    //      Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
    //      Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
    //      Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
    //      Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
    // ]

Solution

  • You can get the expected result by stacking the following operators :

    1. Combine input fluxes preserving overall date ordering using mergeComparing operator. WARNING: if one the input flux is slow, it will slow down the entire downstream pipeline
    2. Using windowUntilChanged to group adjacent records that use the same timestamp
    3. Use reduce on each window to merge records as you wish.

    Which gives something like this :

    Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
        .windowUntilChanged(Data::datetime)
        .flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
    

    You can test it with a unit test like so :

    import org.junit.jupiter.api.Test;
    import reactor.core.publisher.Flux;
    import reactor.test.StepVerifier;
    
    import java.time.LocalDateTime;
    import java.util.Comparator;
    
    public class MergeSorted {
    
        record Data(LocalDateTime datetime, Integer value) {}
    
        @Test
        public void test() {
    
            var flux1 = Flux.just(
                    new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
                    new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 1)
            );
    
    
            var flux2 = Flux.just(
                    new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 2),
                    new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 2),
                    new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
            );
    
            var flux3 = Flux.just(
                    new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 5)
            );
    
            var mergeSum = Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
                               .windowUntilChanged(Data::datetime)
                               .flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
    
            StepVerifier.create(mergeSum)
                    .expectNext(
                                  new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
                                  new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 7),
                                  new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 3),
                                  new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
                    )
                    .verifyComplete();
        }
    }