apache-kafkaexactly-once

Kafka - Loosing messages even if app is configured for exactly once and highest durability


There are cases (very rarely, but there are) when I receive duplicates, even if everything is configured for high durability and we use exactly once configuration.

Please check below the application context and test scenario that causes this issue.

Kafka Cluster Setup

3 x Kafka Brokers (1 on host1, 2 on host2 and 3 on host3)

3 x Zookeeper instances (1 on host1, 2 on host2 and 3 on host3)

Kafka configuration


    broker.id=1,2,3

    num.network.threads=2

    num.io.threads=8

    socket.send.buffer.bytes=102400

    socket.receive.buffer.bytes=102400

    socket.request.max.bytes=104857600

    log.dirs=/home/kafka/logs/kafka

    min.insync.replicas=3

    transaction.state.log.min.isr=3

    default.replication.factor=3

    log.retention.minutes=600

    log.segment.bytes=1073741824

    log.retention.check.interval.ms=300000

    zookeeper.connect=host1:2181,host2:2181,host3:2181

    zookeeper.connection.timeout.ms=6000

    group.initial.rebalance.delay.ms=1000

    log.message.timestamp.type=LogAppendTime

    delete.topic.enable=true

    auto.create.topics.enable=false

    unclean.leader.election.enable=false

ZooKeeper configuration


    tickTime=2000

    dataDir=/home/kafka/logs/zk

    clientPort=2181

    maxClientCnxns=0

    initLimit=5

    syncLimit=2

    server.1=host1:2888:3888

    server.2=host2:2888:3888

    server.3=host3:2888:3888

    autopurge.snapRetainCount=3

    autopurge.purgeInterval=24

Kafka internal topics description

Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
      Topic: __transaction_state     Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
ā€‹
Topic:__consumer_offsets       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
      Topic: __consumer_offsets       Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3

Application topics


    Topic input-event
    Topic:input-event     PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
          Topic: input-event     Partition: 0   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
          Topic: input-event     Partition: 1   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
          Topic: input-event     Partition: 2   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3

    Topic output-event
    Topic:output-event       PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
          Topic: output-event       Partition: 0   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
          Topic: output-event       Partition: 1   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
          Topic: output-event       Partition: 2   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

Application consumer properties


    o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
                  auto.commit.interval.ms = 5000
                  auto.offset.reset = earliest
                  bootstrap.servers = [host1:9092, host2:9092, host3:9092]
                  check.crcs = true
                  client.id =
                  connections.max.idle.ms = 540000
                  default.api.timeout.ms = 60000
                  enable.auto.commit = false
                  exclude.internal.topics = true
                  fetch.max.bytes = 134217728
                  fetch.max.wait.ms = 500
                  fetch.min.bytes = 1
                  group.id = groupId
                  heartbeat.interval.ms = 3000
                  interceptor.classes = []
                  internal.leave.group.on.close = true
                  isolation.level = read_committed
                  key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
                  max.partition.fetch.bytes = 134217728
                  max.poll.interval.ms = 300000
                  max.poll.records = 1
                  metadata.max.age.ms = 300000
                  metric.reporters = []
                  metrics.num.samples = 2
                  metrics.recording.level = INFO
                  metrics.sample.window.ms = 30000
                  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
                  receive.buffer.bytes = 65536
                  reconnect.backoff.max.ms = 1000
                  reconnect.backoff.ms = 1000
                  request.timeout.ms = 30000
                  retry.backoff.ms = 1000
                  sasl.client.callback.handler.class = null
                  sasl.jaas.config = null
                  sasl.kerberos.kinit.cmd = /usr/bin/kinit
                  sasl.kerberos.min.time.before.relogin = 60000
                  sasl.kerberos.service.name = null
                  sasl.kerberos.ticket.renew.jitter = 0.05
                  sasl.kerberos.ticket.renew.window.factor = 0.8
                  sasl.login.callback.handler.class = null
                  sasl.login.class = null
                  sasl.login.refresh.buffer.seconds = 300
                  sasl.login.refresh.min.period.seconds = 60
                  sasl.login.refresh.window.factor = 0.8
                  sasl.login.refresh.window.jitter = 0.05
                  sasl.mechanism = GSSAPI
                  security.protocol = PLAINTEXT
                  send.buffer.bytes = 131072
                  session.timeout.ms = 10000
                  ssl.cipher.suites = null
                  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                  ssl.endpoint.identification.algorithm = https
                  ssl.key.password = null
                  ssl.keymanager.algorithm = SunX509
                  ssl.keystore.location = null
                  ssl.keystore.password = null
                  ssl.keystore.type = JKS
                  ssl.protocol = TLS
                  ssl.provider = null
                  ssl.secure.random.implementation = null
                  ssl.trustmanager.algorithm = PKIX
                  ssl.truststore.location = null
                  ssl.truststore.password = null
                  ssl.truststore.type = JKS
                  value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Application producer properties

    bootstrapServers = "host1, host2, host3"
    transactionIdPrefix = "my-producer-"${instance}"
    "enable.idempotence" = "true"
    "acks" = "all"
    "retries" = "2147483647"
    "transaction.timeout.ms" = "10000"
    "max.in.flight.requests.per.connection" = "1"
    "reconnect.backoff.max.ms" = "1000"
    "reconnect.backoff.ms" = "1000"
    "retry.backoff.ms" = "1000"

Application handling commits

Using KafkaTransactionManager, we start transaction, write message to output topic using KafkaTemplate and also send consumer offsets (spring-kafka 2.2.8.RELEASE).

Test expected/actual

Expectation would have been to have 32,000 messages to the output topic, however, sometimes we actually end up with a duplicate (at least one).

There are times when we end up with 32,000 messages and everything is right.


Solution

  • The issue was related to the fact that transaction id was not correctly set at topic.partition level and we had two producers writing same message for same partition twice.

    This was a good read: https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html