javaproject-reactor

Reactor GroupedFlux - wait to complete


Having an async publisher like the one bellow, is there a way with Project Reactor to wait till the entire stream is finished processing?
Of course, without having to add a sleep for an unknown duration...


    @Test
    public void groupByPublishOn() throws InterruptedException {
        UnicastProcessor<Integer> processor = UnicastProcessor.create();

        List<Integer> results = new ArrayList<>();
        Flux<Flux<Integer>> groupPublisher = processor.publish(1)
                                                      .autoConnect()
                                                      .groupBy(i -> i % 2)
                                                      .map(group -> group.publishOn(Schedulers.parallel()));

        groupPublisher.log()
                      .subscribe(g -> g.log()
                                       .subscribe(results::add));

        List<Integer> input = Arrays.asList(1, 3, 5, 2, 4, 6, 11, 12, 13);
        input.forEach(processor::onNext);
        processor.onComplete();

        Thread.sleep(500);

        Assert.assertTrue(results.size() == input.size());
    }

Solution

  • You can replace these lines:

    groupPublisher.log()
      .subscribe(g -> g.log().subscribe(results::add));
    

    with this

    groupPublisher.log()
      .flatMap(g -> g.log().doOnNext(results::add))
      .blockLast();
    

    flatMap is a better pattern than subscribe-within-subscribe and will take care of subscribing to the group for you.

    doOnNext takes care of the consuming side-effect (adding values to the collection), freeing you up from the need to perform that in the subscription.

    blockLast() replaces the subscription, and rather than letting you provide handlers for the events it blocks until the completion (and returns the last emitted item, but you would already have taken care of that within doOnNext).