I have a project using Kafka in KRaft mode, with two microservices. I’m running Kafka with 3 brokers and 3 controllers, using the following Docker image: confluentinc/cp-kafka:7.8.0
Here’s the scenario:
m1
, m2
, m3
.m4
.m4
.m5
, and Service 2 successfully consumes m5
.The message m4
is completely lost on the consumer side — it is never received or processed.
In the non-working version, the consumer is initialized like this:
consumer = KafkaConnection(
group_id="wins-consumer-group-cost-service-01",
offset_reset="latest"
)
When I change the consumer to use a dynamic group.id and offset_reset='earliest', the message is received:
consumer = KafkaConnection(
group_id=f"wins-consumer-group-cost-service-{uuid.uuid4().hex[:4]}",
offset_reset="earliest",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
However, using a new group ID every time prevents me from maintaining durable state across restarts, and is not practical in production.
Consumer code block:
def sourceAttr_consumer():
consumer = None
retry_count = 0
topic_name = 'eys_device_wins_updated_defSourceAttrs'
try:
while True:
# Consumer yoksa veya kapandıysa yeniden oluştur
if consumer is None:
try:
dynamic_suffix = uuid.uuid4().hex[:4]
log.info("Creating new Kafka consumer...")
consumer = KafkaConnection(
group_id="wins-consumer-group-cost-service-01",
offset_reset="latest",
)
# Topic yoksa oluştur
if not consumer.topic_exists(topic_name):
consumer.add_topic(topic_name)
consumer = consumer.create_consumer()
consumer.subscribe([topic_name])
log.info(f"Subscribed to Kafka topic: {topic_name}")
except Exception as conn_ex:
err
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
codes
value = msg.value()
if value is None:
log.warning(f"Received message with None value. Message key: {msg.key()}")
continue
try:
decoded_value = value.decode('utf-8')
except Exception as decode_ex:
log.error(f"Failed to decode message: {decode_ex} - raw value: {value}")
continue
try:
jData = json.loads(decoded_value)
consumer.commit(message=msg)
except json.JSONDecodeError as json_ex:
log.error(f"Failed to parse JSON: {json_ex} - message: {decoded_value}")
continue
if jData is not None:
try:
consume_business_source_attr(jData)
except Exception as business_ex:
log.error(f"Error in business logic: {business_ex} - message: {jData}")
except Exception as ex:
log.exception(f"sourceAttr_consumer crashed with: {ex}")
retry_count += 1
sleep_time = min(60, 2 ** retry_count)
log.warning(f"Retrying consumer in {sleep_time} seconds...")
time.sleep(sleep_time)
finally:
retry_count = 0
if consumer is not None:
consumer.commit()
consumer.close()
log.info("Kafka consumer closed.")
Expected Behavior When Service 2 (the consumer) restarts, it should resume consuming from where it left off — and receive any messages published while it was offline, such as m4.
Questions Why is the message m4 lost when the consumer group restarts with the same group.id and offset_reset=latest?
Is this expected behavior?
How can I ensure durable message consumption across consumer restarts without changing the group ID every time?
This is not an expected behavior, of course.
I've never used python kafka clients, but
consumer.commit(message=msg)
What are you trying to commit here? Parameter should be a dict of {TopicPartition: OffsetAndMetadata}
Also, you have commit() in finally block, but (for example) in JVM scenario this block is not guaranteed to be executed (for example SIGTERM/ Control+Brake (SIGINT))
Usually consumer is closed via shutdownhook via .wakeUp + some atomic field (because it's not thread safe object and it can't be closed from another thread) like here
In order to check your commited offsets you can run a tool script and describe your group to see offsets
kafka-consumer-groups.sh --bootstrap-server broker1:30903,broker2:30448, broker3:30805 --describe --group {your group name}
Hope it will give you some clue.