Hello i use the following configuration for Kafka using docker compose
compose_kafka.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.1.10
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
kafka_manager:
image: kafkamanager/kafka-manager
container_name: kafka-manager
restart: always
ports:
- "9000:9000"
environment:
ZK_HOSTS: "192.168.1.10:2181"
APPLICATION_SECRET: "random-secret"
I create a producer which produces messages to the kafka server
Generate.py
from faker import Faker
fake = Faker()
class Registered_user:
def get_registered_user():
return {
"name": fake.name(),
"address": fake.address(),
"created_at": fake.year()
}
Producer_registered_user.py
import time
import json
from kafka import KafkaProducer
from fake_data import Generate
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers='192.168.1.10:9092',
value_serializer=json_serializer)
if __name__ == '__main__':
while 1 == 1:
user = Generate.Registered_user.get_registered_user()
producer.send('registered_user', user)
print(user)
time.sleep(4)
But the consumer does not receive any messages:
Consumer_registered_user.py
import json
from kafka import KafkaConsumer
if __name__ == '__main__':
consumer = KafkaConsumer(
bootstrap_servers='192.168.1.10:9092',
auto_offset_reset="from-beginning",
group_id="consumer-group-a"
)
for message in consumer:
print("User = {}".format(json.loads(message.value)))
I also checked if the topic is listed under the topics:
and if kafka received the messages:
Could you please help me with this problem?
Check your consumer application configuration,
Sample:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'topic_1', 'topic_2',
bootstrap_servers='192.168.1.10:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group'
)
#Make sure the consumer is successfully subscribing to all required topics
print(consumer.subscription())
for message in consumer:
print(f"Received from {message.topic}: {message.value.decode('utf-8')}")
Add topic to your consumer:
Consumer_registered_user.py
import json
from kafka import KafkaConsumer
if __name__ == '__main__':
consumer = KafkaConsumer(
'registered_user', # Specify the topic you want to consume
bootstrap_servers='192.168.1.10:9092',
auto_offset_reset="earliest", # Correct value
group_id="consumer-group-a"
)
for message in consumer:
print("User = {}".format(json.loads(message.value.decode('utf-8'))))
Issues:
Missing Topic Subscription: You need to pass the topic name ('registered_user') as the first argument to KafkaConsumer to ensure your consumer subscribes to that topic.
Setting auto_offset_reset="earliest"
ensures that if there are no committed offsets, the consumer will start reading from the earliest available message in the topic.
Ref - https://docs.confluent.io/kafka-clients/python/current/overview.html