I'm playing with Alpakka
and its JMS
connector to dequeue data from Oracle AQ
. I could come up with the very basic implementation below by following this guide.
My question is how I can make it transactional, so that I can guarantee that my message won't be lost if an exception is thrown.
object ConsumerApp extends App {
implicit val system: ActorSystem = ActorSystem("actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource())
val out = JmsSource.textSource(
JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)
val sink = Sink.foreach { message: String =>
println("in sink: " + message)
throw new Exception("") // !!! MESSAGE IS LOST !!!
}
out.runWith(sink, materializer)
}
If it was PL/SQL
, the solution would be like this:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (44);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
DBMS_AQ.dequeue (
queue_name => 'My_Queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
-- do something with the message
COMMIT;
END;
The default behavior when a stream stage fails is to shut down the entire stream. You'll have to decide how you want to handle errors in the stream. One approach, for example, is to restart the stream with a backoff strategy.
Also, since you're using the Alpakka JMS connector, set the acknowledgement mode to ClientAcknowledge
(this is available since Alpakka 0.15). With this configuration, messages that aren't acknowledged can be delivered again through the JMS source. For example:
val jmsSource: Source[Message, NotUsed] = JmsSource(
JmsSourceSettings(connectionFactory)
.withQueue("My_Queue")
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
)
val result = jmsSource
.map {
case textMessage: TextMessage =>
val text = textMessage.getText
textMessage.acknowledge()
text
}
.runForeach(println)