pythonapache-kafkakafka-consumer-apikafka-pythonkafka-topic

Python-Kafka: Keep polling topic infinitely


I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:

def test():
    consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
    for msg in consumer:
        print(msg.value)

This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?

Any relevant example where the topic is continuously monitored is also great for me.


Solution

  • Using confluent_kafka

    import time
    from confluent_kafka import Consumer
    
    
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-consumer-1',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe(['topicName'])
    
    while True:
        try: 
            message = consumer.poll(10.0)
    
            if not message:
                time.sleep(120) # Sleep for 2 minutes
    
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue
    
            print(f"Received message: {message.value().decode('utf-8')}")
        except:
            # Handle any exception here
            ...
        finally:
            consumer.close()
            print("Goodbye")
    

    Using kafka-python

    import time
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
         bootstrap_servers=['localhost:9092'],
         auto_offset_reset='earliest',
         group_id='my-consumer-1',
    )
    consumer.subscribe(['topicName'])
    
    while True:
        try: 
            message = consumer.poll(10.0)
    
            if not message:
                time.sleep(120) # Sleep for 2 minutes
    
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue
    
            print(f"Received message: {message.value().decode('utf-8')}")
        except:
            # Handle any exception here
            ...
        finally:
            consumer.close()
            print("Goodbye")