apache-kafkajmsakka-streamalpakka

alpakka jms client acknowledgement mode delivery guarantee


I have an alpakka JMS source -> kafka sink kind of a flow. I'm looking at the alpakka jms consumer documentation and trying to figure out what kind of delivery guarantees this gives me.

From https://doc.akka.io/docs/alpakka/current/jms/consumer.html

val result: Future[immutable.Seq[javax.jms.Message]] =
  jmsSource
    .take(msgsIn.size)
    .map { ackEnvelope =>
      ackEnvelope.acknowledge()
      ackEnvelope.message
    }
    .runWith(Sink.seq)

I'm hoping that the way this actually works is that messages will only be ack'ed once the sinking succeeds (for at-least-once delivery guarantees), but I can't rely on wishful thinking.

Given that alpakka does not seem to utilize any kind of own state that persists across restarts, I can't think how I'd be able to get exactly-once guarantees a'la flink here, but can I at least count on at-least-once, or would I have to (somehow) ack in a map of a kafka producer flexiFlow (https://doc.akka.io/docs/alpakka-kafka/current/producer.html#producer-as-a-flow)

Thanks, Fil


Solution

  • In that stream, the ack will happen before messages are added to the materialized sequence and before result becomes available for you to do anything (i.e. the Future completes). It therefore would be at-most-once.

    To delay the ack until some processing has succeeded, the easiest approach is to keep what you're doing with the messages in the flow rather than materialize a future. The Alpakka Kafka producer supports a pass-through element which could be the JMS message:

    val topic: String = ???
    
    def javaxMessageToKafkaMessage[Key, Value](
      ae: AckEnvelope,
      kafkaKeyFor: javax.jms.Message => Key,
      kafkaValueFor: javax.jms.Message => Value
    ): ProducerMessage.Envelope[Key, Value, javax.jms.Message] = {
      val key = kafkaKeyFor(ae.message)
      val value = kafkaValueFor(ae.message)
      ProducerMessage.single(new ProducerRecord(topic, key, value), ae)
    }
    
    // types K and V are unspecified...
    jmsSource
      .map(
        javaxMessageToKafkaMessage[K, V](
          _,
          { _ => ??? },
          { _ => ??? }
        )
      )
      .via(Producer.flexiFlow(producerSettings))
      .to(
        Sink.foreach { results =>
          val msg = results.passThrough
          msg.acknowledge()
        }
      )(Keep.both)
    

    running this stream will materialize as a tuple of a JmsConsumerControl with a Future[Done]. Not being familiar with JMS, I don't know how a shutdown of the consumer control would interact with the acks.