pythonapache-kafkakafka-consumer-apiconfluent-kafka-python

Kafka Consumer first poll(0) returns no data


I'm using confluent-kafka-client. I have one producer producing into a topic with one partition and one consumer within one group ID. First, I create a producer (with default configs) for the topic (if the topic doesn't exist, I create one with that name)

self.producer = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers})

Then, I create a consumer and subscribe it to the topic (with default configs, auto.offset.reset="latest")

self.consumer = confluent_kafka.Consumer(
            {"bootstrap.servers": self.bootstrap_servers,
                "group.id": self.group_id},
            logger=logger,
        )
self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
self.consumer.poll(0) # first call

I realized that self.consumer.poll(0) doesn't register that consumer to the topic since there is no data on that topic yet. After, the producer produces a record. Then, I call consumer.poll(0) # second call expecting to get data. However, it returns None. In fact, after data is produced, the call of poll(0) registers the consumer. I can get data by calling the third time. How to register a consumer to a topic if there is no data on that topic yet?

References: Kafka consumer.poll returns no records


Solution

  • I discovered that once a consumer subscribes to a topic using consumer.subscribe(topic), the initial calls of poll(0) may return no data as no partition is assigned yet. In fact, processes such as partition assignment and rebalance take time to be finally ready to fetch data. To circumvent "no data return", I implemented a strategy where I repeatedly call poll(0) until a partition is assigned to the consumer. This approach can be implemented in the Python confluent-kafka-client as shown below:

    self.consumer.subscribe(self.topic_names, on_assign=print_assignment)
    while len(self.consumer.assignment()) == 0:
        self.consumer.poll(0)
    

    This method ensures that following this setup, the first call to poll(0) will return data immediately if new data is present in the topic. It is important to note that this solution is optimal when auto.offset.reset is set to "latest".