I have an alpakka JMS source -> kafka sink kind of a flow. I'm looking at the alpakka jms consumer documentation and trying to figure out what kind of delivery guarantees this gives me.
From https://doc.akka.io/docs/alpakka/current/jms/consumer.html
val result: Future[immutable.Seq[javax.jms.Message]] =
jmsSource
.take(msgsIn.size)
.map { ackEnvelope =>
ackEnvelope.acknowledge()
ackEnvelope.message
}
.runWith(Sink.seq)
I'm hoping that the way this actually works is that messages will only be ack'ed once the sinking succeeds (for at-least-once delivery guarantees), but I can't rely on wishful thinking.
Given that alpakka does not seem to utilize any kind of own state that persists across restarts, I can't think how I'd be able to get exactly-once guarantees a'la flink here, but can I at least count on at-least-once, or would I have to (somehow) ack in a map of a kafka producer flexiFlow (https://doc.akka.io/docs/alpakka-kafka/current/producer.html#producer-as-a-flow)
Thanks, Fil
In that stream, the ack will happen before messages are added to the materialized sequence and before result
becomes available for you to do anything (i.e. the Future
completes). It therefore would be at-most-once
.
To delay the ack until some processing has succeeded, the easiest approach is to keep what you're doing with the messages in the flow rather than materialize a future. The Alpakka Kafka producer supports a pass-through element which could be the JMS message:
val topic: String = ???
def javaxMessageToKafkaMessage[Key, Value](
ae: AckEnvelope,
kafkaKeyFor: javax.jms.Message => Key,
kafkaValueFor: javax.jms.Message => Value
): ProducerMessage.Envelope[Key, Value, javax.jms.Message] = {
val key = kafkaKeyFor(ae.message)
val value = kafkaValueFor(ae.message)
ProducerMessage.single(new ProducerRecord(topic, key, value), ae)
}
// types K and V are unspecified...
jmsSource
.map(
javaxMessageToKafkaMessage[K, V](
_,
{ _ => ??? },
{ _ => ??? }
)
)
.via(Producer.flexiFlow(producerSettings))
.to(
Sink.foreach { results =>
val msg = results.passThrough
msg.acknowledge()
}
)(Keep.both)
run
ning this stream will materialize as a tuple of a JmsConsumerControl
with a Future[Done]
. Not being familiar with JMS, I don't know how a shutdown
of the consumer control would interact with the acks.