pythonapache-kafka

Kafka consumer does not receive messages published while it was offline, despite using the same group.id


Problem Description

I have a project using Kafka in KRaft mode, with two microservices. I’m running Kafka with 3 brokers and 3 controllers, using the following Docker image: confluentinc/cp-kafka:7.8.0

Here’s the scenario:

  1. Both services are running.
  2. Service 1 sends messages: m1, m2, m3.
  3. Service 2 receives and processes all three messages successfully.
  4. I then shut down Service 2.
  5. While Service 2 is offline, Service 1 sends message m4.
  6. I restart Service 2.
  7. Although the Kafka consumer in Service 2 is re-initialized correctly, it does not receive message m4.
  8. Service 1 then sends message m5, and Service 2 successfully consumes m5.

The message m4 is completely lost on the consumer side — it is never received or processed.

Consumer Configurations

In the non-working version, the consumer is initialized like this:

consumer = KafkaConnection(
    group_id="wins-consumer-group-cost-service-01",
    offset_reset="latest"
)

When I change the consumer to use a dynamic group.id and offset_reset='earliest', the message is received:

consumer = KafkaConnection(
    group_id=f"wins-consumer-group-cost-service-{uuid.uuid4().hex[:4]}",
    offset_reset="earliest",
    enable_auto_commit=False,
    auto_commit_interval_ms=1000
)

However, using a new group ID every time prevents me from maintaining durable state across restarts, and is not practical in production.

Consumer code block:

def sourceAttr_consumer():
    consumer = None
    retry_count = 0
    topic_name = 'eys_device_wins_updated_defSourceAttrs'
    try:
        while True:
            # Consumer yoksa veya kapandıysa yeniden oluştur
            if consumer is None:
                try:
                    dynamic_suffix = uuid.uuid4().hex[:4]
                    log.info("Creating new Kafka consumer...")
                    consumer = KafkaConnection(
                        group_id="wins-consumer-group-cost-service-01",
                        offset_reset="latest",
                        
                    )

                    # Topic yoksa oluştur
                    if not consumer.topic_exists(topic_name):
                        consumer.add_topic(topic_name)
                    consumer = consumer.create_consumer()
                    consumer.subscribe([topic_name])
                    log.info(f"Subscribed to Kafka topic: {topic_name}")
                except Exception as conn_ex:
                    err

            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                codes

            value = msg.value()
            if value is None:
                log.warning(f"Received message with None value. Message key: {msg.key()}")
                continue

            try:
                decoded_value = value.decode('utf-8')
            except Exception as decode_ex:
                log.error(f"Failed to decode message: {decode_ex} - raw value: {value}")
                continue

            try:
                jData = json.loads(decoded_value)
                consumer.commit(message=msg)
            except json.JSONDecodeError as json_ex:
                log.error(f"Failed to parse JSON: {json_ex} - message: {decoded_value}")
                continue

            if jData is not None:
                try:
                    consume_business_source_attr(jData)
                except Exception as business_ex:
                    log.error(f"Error in business logic: {business_ex} - message: {jData}")

    except Exception as ex:
        log.exception(f"sourceAttr_consumer crashed with: {ex}")
        retry_count += 1
        sleep_time = min(60, 2 ** retry_count)
        log.warning(f"Retrying consumer in {sleep_time} seconds...")
        time.sleep(sleep_time)
    finally:
        retry_count = 0
        if consumer is not None:
            consumer.commit()
            consumer.close()
            log.info("Kafka consumer closed.")

Expected Behavior When Service 2 (the consumer) restarts, it should resume consuming from where it left off — and receive any messages published while it was offline, such as m4.

Questions Why is the message m4 lost when the consumer group restarts with the same group.id and offset_reset=latest?

Is this expected behavior?

How can I ensure durable message consumption across consumer restarts without changing the group ID every time?


Solution

  • This is not an expected behavior, of course.
    I've never used python kafka clients, but

    consumer.commit(message=msg)
    

    What are you trying to commit here? Parameter should be a dict of {TopicPartition: OffsetAndMetadata}
    Also, you have commit() in finally block, but (for example) in JVM scenario this block is not guaranteed to be executed (for example SIGTERM/ Control+Brake (SIGINT))
    Usually consumer is closed via shutdownhook via .wakeUp + some atomic field (because it's not thread safe object and it can't be closed from another thread) like here

    In order to check your commited offsets you can run a tool script and describe your group to see offsets

    kafka-consumer-groups.sh --bootstrap-server broker1:30903,broker2:30448, broker3:30805 --describe --group {your group name}
    

    Hope it will give you some clue.