I am using Akka Streams Kafka to pipe Kafka messages to a remote service. I want to guarantee that the service receives every message exactly once (at-least-once AND at-most-once delivery).
Here's the code I came up with:
private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicPattern: String,
mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
.map(message => (message, mapCommittableMessageToSinkMessage(message)))
.mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
}
As the code shows, it maps tuples of the original message, as well as the transformed messages passed to the subscriber (an actor that sends to remote service). The purpose of the tuple is to commit the offset after the subscriber completes processing.
Something about it just seems like an anti-pattern, but I'm not sure a better way to do it. Any suggestions on a better way?
Thanks!
One way to keep this a cleaner and easier to change could be by using the GraphDSL. It would allow you to spawn a branch of you graph carrying over the Committable
part of your message, whilst another branch can perform all the needed business logic.
An example of graph could be (omitting all the boilerplate for better clarity):
val src = Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))
val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore) // look into Sink.foldAsync for a more compact re-write of this part
src ~> broadcast
broadcast ~> businessLogic ~> zip.in0
broadcast ~> zip.in1
zip.out.map(_._2) ~> snk