I am using python-kafka to listen to a kafka topic and use that the records. I want to keep it polling infinitely without any exit. This is my code below:
def test():
consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
This code just reads the data and exits directly. Is there a way to keep listening to topics even if message is not pushed to it?
Any relevant example where the topic is continuously monitored is also great for me.
Using confluent_kafka
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")
Using kafka-python
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='my-consumer-1',
)
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {message.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")