I am writing a little polling mechanism using Mutiny, part of me learning the library and i am kinda stuck in cancelling the polling when result is found. I tried using the tick() and what i came up with looks like
Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onItem().transformToMultiAndMerge(tick -> {
System.out.println("Tick:" + tick);
return Multi.createFrom()
.<Transaction>emitter(
emitter -> {
service.getTransactions().toMulti()
.onItem().transformToMultiAndMerge(
transactions -> Multi.createFrom().iterable(transactions))
.subscribe().with(transaction -> {
if (!verification.isOngoing()) {
emitter.fail(new TransactionVerificationException());
} else {
boolean transactionFound = transaction.getAmount().stream().anyMatch(
amount -> amount.getQuantity()
.equals("test"));
if (transactionFound) {
emitter.emit(transaction);
emitter.complete();
}
}
});
});
})
.subscribe()
.with(transaction -> log.info(transaction),
x -> x.printStackTrace());
Problem here is that the Multi from ticks() is running forever and the only way i think of to cancel it would be to propagate somehow that the emitter has completed. The case here is that i want to emit, and process only if certain conditions are met.
You approach is almost correct, though,
MultiEmitter
out of an existing Multi
(or transformed Uni
) as you can leverage the different Multi
operators on top of your source service#getTransaction
resultEmptyMulti
source which will automatically complete downstream subscriber chain and which you can use to signal an absence of valid item (i.e. Transaction
)Multi
to Uni
which will result in the upstream subscription being cancelled automatically once an item is receivedHere down what the stream pipeline would look like:
Multi.createFrom()
.ticks()
.every(Duration.ofSeconds(5))
.onItem()
// flat map the ticks to the `service#getTransactions` result
.transformToMultiAndMerge(tick -> service.getTransactions()
.toMulti()
.onItem()
// flatten Collection<Transaction> to Multi<Transaction>
.transformToIterable(Function.identity())
.onItem()
.transformToMultiAndMerge(transaction -> {
if (!verification.isOngoing()) {
return Multi.createFrom().failure(new TransactionVerificationException());
} else {
boolean transactionFound = transaction.getAmount()
.stream()
.anyMatch(amount -> amount.getQuantity().equals("test"));
if (transactionFound) {
return Multi.createFrom().item(transaction);
} else {
return Multi.createFrom().empty();
}
}
})
)
.select()
.first(Objects::nonNull)
.toUni()
.subscribe()
.with(transaction -> log.info(transaction), x -> x.printStackTrace());