javareactive-programmingreactive-streamsmutiny

Mutiny - Propagate completion to parent multi / polling


I am writing a little polling mechanism using Mutiny, part of me learning the library and i am kinda stuck in cancelling the polling when result is found. I tried using the tick() and what i came up with looks like

Multi.createFrom().ticks().every(Duration.ofSeconds(5))
    .onItem().transformToMultiAndMerge(tick -> {
      System.out.println("Tick:" + tick);
      return Multi.createFrom()
          .<Transaction>emitter(
              emitter -> {
                service.getTransactions().toMulti()
                    .onItem().transformToMultiAndMerge(
                        transactions -> Multi.createFrom().iterable(transactions))
                    .subscribe().with(transaction -> {
                      if (!verification.isOngoing()) {
                        emitter.fail(new TransactionVerificationException());
                      } else {
                        boolean transactionFound = transaction.getAmount().stream().anyMatch(
                            amount -> amount.getQuantity()
                                .equals("test"));
                        if (transactionFound) {
                          emitter.emit(transaction);
                          emitter.complete();
                        } 
                      }
                    });
              });
    })
    .subscribe()
    .with(transaction -> log.info(transaction),
        x -> x.printStackTrace());

Problem here is that the Multi from ticks() is running forever and the only way i think of to cancel it would be to propagate somehow that the emitter has completed. The case here is that i want to emit, and process only if certain conditions are met.


Solution

  • You approach is almost correct, though,

    Here down what the stream pipeline would look like:

    Multi.createFrom()
            .ticks()
            .every(Duration.ofSeconds(5))
            .onItem()
            // flat map the ticks to the `service#getTransactions` result
            .transformToMultiAndMerge(tick -> service.getTransactions()
                    .toMulti()
                    .onItem()
                    // flatten Collection<Transaction> to Multi<Transaction>
                    .transformToIterable(Function.identity())
                    .onItem()
                    .transformToMultiAndMerge(transaction -> {
                        if (!verification.isOngoing()) {
                            return Multi.createFrom().failure(new TransactionVerificationException());
                        } else {
                            boolean transactionFound = transaction.getAmount()
                                    .stream()
                                    .anyMatch(amount -> amount.getQuantity().equals("test"));
                            if (transactionFound) {
                                return Multi.createFrom().item(transaction);
                            } else {
                                return Multi.createFrom().empty();
                            }
                        }
                    })
            )
            .select()
            .first(Objects::nonNull)
            .toUni()
            .subscribe()
            .with(transaction -> log.info(transaction), x -> x.printStackTrace());