scalaunit-testingapache-kafkaakka-stream

How do I test that an offset has been committed or not to Kafka


I have an Akka Stream Kafka source that is reading from a Kafka topic.

I have a simple task that is allowing disabling commit of the message offset. The commit is usually done calling commitScaladsl.

My problem is I don't know how to test if the offset has been committed or not.

We usually use EmbeddedKafka for testing, but I haven't figured out a way of asking for the last committed offset.

This is an example of the test I have written:

  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

Now I want to add a check for the offset being committed or not.


Solution

  • I finally solved it using a ConsumerInterceptor, defined as:

    class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
      override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records
    
      override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
        import scala.collection.JavaConverters._
        OffsetRecorder.add(offsets.asScala)
      }
    
      override def close(): Unit = {}
    
      override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear
    
    }
    

    onCommit is called when the commit is done, in this case I just record it. I use configure method to have empty records at the start of each test.

    Then, when creating the consumer settings for the source, I add the interceptor as a property:

      ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
        .withBootstrapServers(s"localhost:${kafkaPort}")
        .withGroupId("group-id")
        .withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")