I have the following code:
from confluent_kafka.admin import AdminClient, NewTopic
a = AdminClient({'bootstrap.servers': 'localhost:9092'})
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]]
fs = a.create_topics(new_topics)
for topic, f in fs.items():
try:
f.result()
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
Creating the topics worked fine.
This is my producer:
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
some_data_source = ["hello", "wuff"]
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for data in some_data_source:
p.poll(0)
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
p.flush()
Message delivered to mytopic [0]
Message delivered to mytopic [0]
Consumer:
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['topic1'])
while True:
msg = c.poll(1.0)
print(msg)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
When I subscribe to the topic (which works), I only get None
every second. Am I doing something wrong here? Does it has to do something with 'group.id': 'mygroup'
? Can anyone help me?
Your producer code is writing to mytopic
topic. Which doesn't match your create script or what your consumer has subscribed to.
Also, if you don't want it to print None, then move the print statement inside the if statement since poll function can return None
As commented, you may also want to try further debugging with CLI tools