I'm trying to figure out if it's possible to catch transaction events like it's described here, but with the reactive client?
If it's not possible I would appreciate if someone provide an example how it can be implement manually. I want to be able to add some business logic in my application before transaction start, before transaction commit and after transaction commit. And I think that events are best suited for such logic. Thanks in advance.
Finally, I've found a way how it can be implemented manually. Reactive hibernate has such a method in the Mutiny session implementation:
Uni<T> executeInTransaction(Function<Mutiny.Transaction, Uni<T>> work) {
return work.apply( this )
// only flush() if the work completed with no exception
.call( this::flush )
.call( this::beforeCompletion )
// in the case of an exception or cancellation
// we need to rollback the transaction
.onFailure().call( this::rollback )
.onCancellation().call( this::rollback )
// finally, when there was no exception,
// commit or rollback the transaction
.call( () -> rollback ? rollback() : commit() )
.call( this::afterCompletion );
}
So, as you can see, two methods (beforeCompletion and afterCompletion) are called in the chain, which allows us to add custom logic before and after transaction commit. Those methods execute contract implementations from the queue. I'll show you an example of the "before" event.
First of all, we should create some qualifier annotations.
The @Entity
annotation we are going to use to attach event listeners for the specific entity event:
package com.example.annotation;
import java.io.Serial;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import javax.enterprise.util.AnnotationLiteral;
import javax.inject.Qualifier;
import lombok.EqualsAndHashCode;
import com.example.model.BaseEntity;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Qualifier
@Retention(RUNTIME)
@Target({METHOD, PARAMETER, FIELD, TYPE})
@Documented
public @interface Entity {
Class<? extends BaseEntity> value();
@EqualsAndHashCode(callSuper = true)
final class Literal extends AnnotationLiteral<Entity> implements Entity {
@Serial
private static final long serialVersionUID = 2137611959567040656L;
private final Class<? extends BaseEntity> value;
private Literal(Class<? extends BaseEntity> value) {
this.value = value;
}
public static Literal of(Class<? extends BaseEntity> value) {
return new Literal(value);
}
@Override
public Class<? extends BaseEntity> value() {
return value;
}
}
}
Let's imagine we have Book
and Author
entities in our service and both of them extend a BaseEntity
model/interface. This base entity is used here as some kind of qualifier, which will be used later.
And some annotations for CRUD actions, here as an example of the "create" action (the same is for the "update" action):
@Qualifier
@Retention(RUNTIME)
@Target({METHOD, PARAMETER, FIELD, TYPE})
@Documented
public @interface Create {
@EqualsAndHashCode(callSuper = true)
final class Literal extends AnnotationLiteral<Create> implements Create {
private static final long serialVersionUID = 2137611959567040656L;
public static final Literal INSTANCE = new Literal();
private Literal() {
}
}
}
Next we create the class, which will trigger events. In this example we register "pre-insert" and "pre-update" listeners:
package com.example.observer;
import java.io.Serial;
import java.lang.annotation.Annotation;
import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.spi.CDI;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.event.spi.*;
import org.hibernate.reactive.session.ReactiveSession;
import com.example.annotation.Entity;
import com.example.annotation.operation.Create;
import com.example.annotation.operation.Update;
import com.example.model.BaseEntity;
import com.example.observer.action.EventReactiveBeforeTransactionCompletionProcess;
/**
* {@inheritDoc}
* <p>Component, which registers 'pre' events</p>
*/
@Slf4j
@ApplicationScoped
public class TransactionProcessRegistrarEventListener implements PreInsertEventListener, PreUpdateEventListener {
@Serial
private static final long serialVersionUID = 6763048376606381859L;
/**
* {@inheritDoc}
*
* @param event event
*/
@Override
public boolean onPreInsert(PreInsertEvent event) {
return register(event, Create.Literal.INSTANCE);
}
/**
* {@inheritDoc}
*
* @param event event
*/
@Override
public boolean onPreUpdate(PreUpdateEvent event) {
return register(event, Update.Literal.INSTANCE);
}
/**
* Register processes
*
* @param event event
* @return result
*/
@SuppressWarnings("unchecked")
private boolean register(AbstractPreDatabaseOperationEvent event, Annotation qualifier) {
Class<? extends BaseEntity> clazz = (Class<? extends BaseEntity>) event.getEntity().getClass();
log.debug("registering '{}' instances. Entity: {}", event.getClass().getSimpleName(), clazz.getSimpleName());
final SessionImplementor session = event.getSession();
List<EventReactiveBeforeTransactionCompletionProcess> beforeProcesses = CDI.current()
.select(EventReactiveBeforeTransactionCompletionProcess.class, Entity.Literal.of(clazz), qualifier)
.stream().toList();
if (beforeProcesses.isEmpty())
log.debug("no 'before' processes found");
beforeProcesses.forEach(process -> {
process.setEvent(event);
((ReactiveSession) session).getReactiveActionQueue()
.registerProcess(process);
log.debug("process {} has been successfully registered", process.getClass().getSimpleName());
});
return false;
}
}
As you can see here we have a custom EventReactiveBeforeTransactionCompletionProcess
interface, it will allow us to set current event to the hibernate process (and retrieve it later in the event). Let's create it:
package com.example.observer.action;
import org.hibernate.event.spi.AbstractPreDatabaseOperationEvent;
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
public interface EventReactiveBeforeTransactionCompletionProcess extends ReactiveBeforeTransactionCompletionProcess {
<T extends AbstractPreDatabaseOperationEvent> void setEvent(T event);
<T extends AbstractPreDatabaseOperationEvent> T getEvent();
}
Now we have everything to create a custom hibernate integrator, which will allow us to register listeners.
package com.example.observer;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.boot.Metadata;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.event.service.spi.EventListenerRegistry;
import org.hibernate.event.spi.EventType;
import org.hibernate.integrator.spi.Integrator;
import org.hibernate.service.spi.SessionFactoryServiceRegistry;
import javax.enterprise.context.ApplicationScoped;
@Slf4j
@ApplicationScoped
public class EventListenerIntegrator implements Integrator {
/**
* {@inheritDoc}
*
* @param metadata The "compiled" representation of the mapping information
* @param sessionFactory The session factory being created
* @param serviceRegistry The session factory's service registry
*/
@Override
public void integrate(Metadata metadata,
SessionFactoryImplementor sessionFactory,
SessionFactoryServiceRegistry serviceRegistry) {
log.debug("registering {} integrator...", getClass().getSimpleName());
final EventListenerRegistry eventListenerRegistry = serviceRegistry.getService(EventListenerRegistry.class);
eventListenerRegistry.appendListeners(EventType.PRE_INSERT, new TransactionProcessRegistrarEventListener());
eventListenerRegistry.appendListeners(EventType.PRE_UPDATE, new TransactionProcessRegistrarEventListener());
}
/**
* {@inheritDoc}
*
* @param sessionFactory The session factory being closed.
* @param serviceRegistry That session factory's service registry
*/
@Override
public void disintegrate(SessionFactoryImplementor sessionFactory, SessionFactoryServiceRegistry serviceRegistry) {
// intentionally do nothing
}
}
And finally, the listener itself (you can retrieve the event here and the entity from it using parent getEvent
method):
package com.example.observer.action.before.create.book.validator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import io.quarkus.arc.Priority;
import io.quarkus.arc.Unremovable;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.event.spi.AbstractPreDatabaseOperationEvent;
import org.hibernate.reactive.session.ReactiveSession;
import com.example.annotation.Entity;
import com.example.annotation.operation.Create;
import com.example.model.Book;
import com.example.observer.action.EventReactiveBeforeTransactionCompletionProcess;
@Getter
@Setter
@Slf4j
@Priority(10)
@Entity(Book.class)
@Create
@Unremovable
@Dependent
public class BookValidator implements EventReactiveBeforeTransactionCompletionProcess {
private AbstractPreDatabaseOperationEvent event;
/**
* {@inheritDoc}
*
* @param session The session on which the transaction is preparing to complete.
*/
@Override
public CompletionStage<Void> doBeforeTransactionCompletion(ReactiveSession session) {
log.debug("validating, enriching or everything you want with the book entity...");
return CompletableFuture.completedStage(null);
}
}
That's all. If anyone knows a better realization, please let me know.