My task is to count messages in Kafka topics (some with one partition, some with many partitions). I tried two techniques: one with subscribe()
and other with assign()
.
Full code:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import kafka
from kafka.structs import TopicPartition
def count_messages(consumer):
i = 0
while True:
records = consumer.poll(50) # timeout in millis
if not records:
break
for _, consumer_records in records.items():
for _ in consumer_records:
i += 1
return i
def show_messageges_assign(kafka_server, topic):
cnt = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
try:
partitions = consumer.partitions_for_topic(topic)
if partitions:
for partition in partitions:
tp = TopicPartition(topic, partition)
consumer.assign([tp])
consumer.seek(partition=tp, offset=0)
cnt += count_messages(consumer)
finally:
consumer.close()
print('%s assign, partitions: %s, msg cnt=%d' % (topic, partitions, cnt))
def show_messageges_subscribe(kafka_server, topic):
cnt = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
try:
consumer.subscribe([topic])
cnt += count_messages(consumer)
finally:
consumer.close()
print('%s, subscribe, msg cnt=%d' % (topic, cnt))
def test_topic(kafka_server, topic):
show_messageges_assign(kafka_server, topic)
show_messageges_subscribe(kafka_server, topic)
print('')
def main():
kafka_server = '169.0.1.77:9092'
test_topic(kafka_server, 'gc.ifd.analyse.fdp')
test_topic(kafka_server, 'gc.ifd.result.fdp')
if __name__ == '__main__':
main()
Problems:
subscribe()
do not work on some machines. On the same machine assign()
work, but not very reliable.
assign()
works on all test machines I have, but when there are many partitons I think it do not read from all partitions
Results:
After test there are 1435 messages in two topics. I can see them in web browser "UI for Apache Kafka".
Results on my 4 test machines:
Machine 1 and 2:
[root@test-kafka emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435
[root@igg emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435
Machine 3 and 4:
[mn: emulator] python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0
[root@test-gc emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0
As you see using assign()
I can read from topic with 4 partitions on all test machines, but not all messages are read.
On two machines subscribe()
do not read any message, while on two other it reads all messages.
Is there something wrong with my code or with my environment?
I changed use of poll()
according to StéphaneD. comment:
def count_messages(consumer):
i = 0
try_cnt = 0
while True:
records = consumer.poll(50) # timeout in millis
if not records:
try_cnt += 1
if try_cnt > 10:
break
for _, consumer_records in records.items():
for _ in consumer_records:
i += 1
return i
With this change my programs seems to read all messages with both subscribe()
and assign()
.
When there are no records from consumer.poll()
, do not stop yet: add a retry strategy to check at least if X poll()
are empty in a row, then stop.
I don't think there is any guarantee in Kafka that poll()
will always return something (even if you know there is something), due to Kafka broker algorithms/optimizations.