I'm working on a Quarkus project using Mutiny and Hibernate Reactive Panache. In one of my API endpoints, I need to perform three concurrent database operations: one query and two updates.
My requirements are very specific:
doSelect1
) and one of the database updates (doUpdate1
) must execute together. Both must succeed for the transaction to be considered successful.doSelect1
and doUpdate1
, the API should immediately return the result of the doSelect1
query to the client.doUpdate2
) should be initiated. This operation should be "fire-and-forget."
doUpdate2
to complete.doUpdate2
fails, it should only log the error and must not cause the main transaction (with doSelect1
and doUpdate1
) to roll back or affect the API response.Here is my current implementation using Uni.combine()
:
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple3;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import jakarta.transaction.Transactional;
// ...
@WithTransaction
public Uni<ResultVO> doLogin() {
// I want doSelect1() and doUpdate1() to succeed together,
// and for the method to return as soon as they are done.
// doUpdate2() should run in the background without blocking the response.
return Uni.combine().all().unis(doSelect1(), doUpdate1(), doUpdate2().onFailure().recoverWithNull())
.asTuple()
.onItem().transform(tuple -> tuple.getItem1()); // Return the result of doSelect1
}
public Uni<ResultVO> doSelect1() {
// ... database query logic
}
public Uni<Void> doUpdate1() {
// ... database update logic
}
// This should be a non-blocking, "fire-and-forget" operation
// in a new transaction.
@WithTransaction
@Transactional(Transactional.TxType.REQUIRES_NEW) // Start a new transaction
public Uni<Void> doUpdate2() {
// ... another database update logic that might fail
// and should not affect the main flow.
}
The Problem:
My current code almost works. The onFailure().recoverWithNull()
on doUpdate2()
correctly prevents its failure from causing the entire pipeline to fail. However, the Uni.combine().all()
part still waits for all three Uni
s to complete before proceeding. This means the API response is blocked until doUpdate2
finishes, which violates my "fire-and-forget" requirement.
My Question:
How can I properly implement this non-blocking, fire-and-forget behavior for doUpdate2
while ensuring doSelect1
and doUpdate1
complete together in a transaction?
I have read some documentation and it seems that SmallRye Reactive Messaging might be suitable for this kind of asynchronous, background task. Is that the recommended approach here, or is there a simpler way to achieve this using only Mutiny? For example, is there a way to subscribe to doUpdate2
without having it be part of the blocking combine
operation?
Any guidance would be greatly appreciated. Thank you!
I can't really answer your main question, but https://stackoverflow.com/a/76520504/6692043 seems to be what you are after.
EDIT: Though I'm told running parallel operations on the same Hibernate Reactive session is a big no-no, so you might want to use something like doSelect1().then(doUpdate1())
(or whatever the correct syntax is in Mutiny) rather than Uni.combine().all(...)
.
That being said, there is more.
@Transactional(Transactional.TxType.REQUIRES_NEW)
on reactive methods is incorrect. Right now it most likely is ignored, or just performs some pointless work.
See https://quarkus.io/guides/hibernate-reactive-panache#transactions
I think @WithTransaction
will by default behave like "REQUIRES_NEW". But... it reuses the surrounding session, which is really not good in your case: a Hibernate ORM/Reactive session cannot be used after a failure, which you indicate could happen in doUpdate2
.
What you really need is a new transaction and a new session.
I do not think that's possible with Panache, unfortunately. But... When using "plain" Hibernate Reactive, though you can always start another transaction/session explicitly:
@Inject
Mutiny.SessionFactory sessionFactory;
// This should be a non-blocking, "fire-and-forget" operation
// in a new transaction.
public Uni<Void> doUpdate2() {
return sessionFactory.withTransaction(session -> { // the given session is guaranteed to be a new one
// ... another database update logic that might fail
// and should not affect the main flow.
});
}
Note I am aware the experience is not awesome here, we have plans to improve it: https://github.com/quarkusio/quarkus/issues/47698