I would like to propagate JTA state (= the transaction) between a transactional REST endpoint that emits a message to a reactive-messaging connector.
@Inject
@Channel("test")
Emitter<String> emitter;
@POST
@Transactional
public Response test() {
emitter.send("test");
}
and
@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {
@Inject
TransactionManager tm;
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.flatMapCompletionStage(message -> {
tm.getTransaction(); // = null
return message.ack();
})
.ignore();
}
}
As I understand, context-propagation is responsible for making the transaction available (see io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext
). The problem seems to be, that currentContext
gets created on subscription, which happens when the injection point (Emitter<String> emitter
) get its instance. Which is too early to properly capture the transaction.
What am I missing?
By the way, I am having the same problem when using @Incoming
/ @Outgoing
instead of the emitter. I have decided to give you this example because it is easy to understand and reproduce.
At the moment, you need to pass the current Transaction in the message metadata. Thus, it will be propagated to your different downstream components (as well as the connector).
Note that, Transaction tends to be attached to the request scope, which means that in your connector, it may already be too late to use it. So, make sure your endpoint is asynchronous and only returns when the emitted message is acknowledged.
Context Propagation is not going to help in this case as the underlying streams are built at startup time (at build time in Quarkus) so, there are no capture contexts.