scalaapache-kafkaakkaakka-streamakka-kafka

Why does auto-commit enabled Kafka client commit latest produced message's offset during consumer close even if the message was not consumed yet?


TLDR:

Detailed explanation:

I have a simple scala application that has an Akka actor which consumes messages from a Kafka topic and produces the message to the same topic if any exception occurs during message processing.

TestActor.scala

  override protected def processMessage(messages: Seq[ConsumerRecord[String, String]]): Future[Done] = {
    Future.sequence(messages.map(message => {
      logger.info(s"--CONSUMED: offset: ${message.offset()} message: ${message.value()}")
      // in actual implementation, some process is done here and if an exception occurs, the message is sent to the same topic as seen below
      sendToExceptionTopic(Instant.now().toEpochMilli)
      Thread.sleep(1000)
      Future(Done)
    })).transformWith(_ => Future(Done))
  }

This actor starts every minute and runs for 20 seconds then stops.

Starter.scala

  def init(): Unit = {
    exceptionManagerActor ! InitExceptionActors

    system.scheduler.schedule(2.second, 60.seconds) {
      logger.info("started consuming messages")
      exceptionManagerActor ! ConsumeExceptions
    }
  }

ExceptionManagerActor.scala

  private def startScheduledActor(actorRef: ActorRef): Unit = {
    actorRef ! Start

    context.system.scheduler.scheduleOnce(20.seconds) {
      logger.info("stopping consuming messages")
      actorRef ! Stop
    }
  }

BaseActorWithAutoCommit.scala

  override def receive: Receive = {
    case Start =>
      consumerBase = consumer
        .groupedWithin(20, 2000.millisecond)
        .mapAsyncUnordered(10)(processMessage)
        .toMat(Sink.seq)(DrainingControl.apply)
        .run()

    case Stop =>
      consumerBase.drainAndShutdown().transformWith {
        case Success(value) =>
          logger.info("actor stopped")
          Future(value)
        case Failure(ex) =>
          logger.error("error: ", ex)
          Future.failed(ex)
      }
    //Await.result(consumerBase.drainAndShutdown(), 1.minute)
  }

With this configuration, while stopping, Kafka client is committing the latest produced message's offset as if it was consumed.

Example logs:

14:28:48.868 INFO - started consuming messages
14:28:50.945 INFO - --CONSUMED: offset: 97 message: 1
14:28:51.028 INFO - ----PRODUCED: offset: 98 message: 1643542130945
...
14:29:08.886 INFO - stopping consuming messages
14:29:08.891 INFO - --CONSUMED: offset: 106 message: 1643542147106
14:29:08.895 INFO - ----PRODUCED: offset: 107 message: 1643542148891 <------ this message was lost
14:29:39.946 INFO - actor stopped
14:29:39.956 INFO - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://test-consumer/system/Materializers/StreamSupervisor-2/$$a#1541548736] to Actor[akka://test-consumer/system/kafka-consumer-1#914599016] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://test-consumer/system/kafka-consumer-1#914599016] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14:29:48.866 INFO - started consuming messages <----- The message with offset 107 was expected in this cycle to consume but it was not consumed
14:30:08.871 INFO - stopping consuming messages
14:30:38.896 INFO - actor stopped

As you can see from the logs, a message with offset 107 is produced but was not consumed in the next cycle.

Actually, I am not an expert on Akka actors and don't know if this situation comes from Kafka or Akka, but it seems it is related to auto-commit to me.


used dependency versions:

lazy val versions = new {
  val akka = "2.6.13"
  val akkaHttp = "10.1.9"
  val alpAkka = "2.0.7"
  val logback = "1.2.3"
  val apacheCommons = "1.7"
  val json4s = "3.6.7"
}

libraryDependencies ++= {
  Seq(
    "com.typesafe.akka" %% "akka-slf4j" % versions.akka,
    "com.typesafe.akka" %% "akka-stream-kafka" % versions.alpAkka,
    "com.typesafe.akka" %% "akka-http" % versions.akkaHttp,
    "com.typesafe.akka" %% "akka-protobuf" % versions.akka,
    "com.typesafe.akka" %% "akka-stream" % versions.akka,
    "ch.qos.logback" % "logback-classic" % versions.logback,
    "org.json4s" %% "json4s-jackson" % versions.json4s,
    "org.apache.commons" % "commons-text" % versions.apacheCommons,
  )
}

An example source code and steps to reproduce the situation can be reached from this repository


Solution

  • As far as Kafka is concerned, the message is consumed as soon as Alpakka Kafka reads it from Kafka.

    This is before the actor inside of Alpakka Kafka has emitted it to a downstream consumer for application level processing.

    Kafka auto-commit (enable.auto.commit = true) will thus result in the offset being committed before the message has been sent to your actor.

    The Kafka docs on offset management do (as of this writing) refer to enable.auto.commit as having an at-least-once semantic, but as noted in my first paragraph, this is an at-least-once delivery semantic, not an at-least-once processing semantic. The latter is an application level concern, and accomplishing that requires delaying the offset commit until processing has completed.

    The Alpakka Kafka docs have an involved discussion about at-least-once processing: in this case, at-least-once processing will likely entail introducing manual offset committing and replacing mapAsyncUnordered with mapAsync (since mapAsyncUnordered in conjunction with manual offset committing means that your application can only guarantee that a message from Kafka gets processed at-least-zero times).

    In Alpakka Kafka, a broad taxonomy of message processing guarantees: