scalaapache-kafkaalpakka

Kafka consumers (same group-id) stucked in reading always from same partition


I have 2 consumers running with the same group-id and reading from topic having 3 partititons and parsing messages with KafkaAvroDeserializer. The consumer has these settings:

  def avroConsumerSettings[T <: SpecificRecordBase](schemaRegistry: String, bootstrapServer: String, groupId: String)(implicit
  actorSystem: ActorSystem): ConsumerSettings[String, T] = {
     val kafkaAvroSerDeConfig = Map[String, Any](
       AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistry,
       KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString
     )
    val kafkaAvroDeserializer = new KafkaAvroDeserializer()
    kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
    val deserializer =
      kafkaAvroDeserializer.asInstanceOf[Deserializer[T]]

    ConsumerSettings(actorSystem, new StringDeserializer, deserializer)
      .withBootstrapServers(bootstrapServer)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  }

I tried to send a malformed message to test error handling and now my consumer is stucked (always retry reading from same partition because I'm using RestartSource.onFailuresWithBackoff); but what is strange to me (AFAIK each consumer in the same group-id cannot read from the same partition) is that if I run another consumer it stucks as well because it reads again from the same partition where unreadable message is.

Can someone help me understand what am I doing wrong?


Solution

  • When you restart the Kafka source after a failure, that results in a new consumer being created; eventually the consumer in the failed source is declared dead by Kafka, triggering a rebalance. In that rebalance, there are no external guarantees of which consumer in the group will be assigned which partition. This would explain why your other consumer in the group reads that partition.

    The issue here with a poison message derailing consumption is a major reason I've developed a preference to treat keys and values from Kafka as blobs by using the ByteArrayDeserializer and do the deserialization myself in the stream, which gives me the ability to record (e.g. by logging; producing the message to a dead-letter topic for later inspection can also work) that there was a malformed message in the topic and move on by committing the offset. Either in Scala is particularly good for moving the malformed message directly to the committer.