pythonapache-kafkakafka-consumer-apikafka-python

How to close kafka consumer once all messages are consumed?


I have following program to consume all the messages coming to Kafka.

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

Using above program i am able to consume all the messages coming to Kafka. But once all messages are consumed, i want to close the kafka consumer which is not happening. I need help in same.


Solution

  • I am able to close kafka consumer now if i provide consumer_timeout_ms argument to KafkaConsumer object. It accepts timeout value in millisecond. Below is the code snippet.

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer('my_test_topic',
                             group_id='my-group',
                             bootstrap_servers=['my_kafka:9092'],
                             consumer_timeout_ms=1000)
    for message in consumer:
        consumer.commit()
        print ("%s key=%s value=%s" % (message.topic,message.key,
                                              message.value))
    consumer.close()
    

    In above code if consumer doesn't see any message for 1 second it will close the session.