kafka-consumer-apiconfluent-platformconfluent-kafka-python

Some Python Confluent Kafka Consumers are staying idle/unassigned even though others overloaded/over-assigned


Setup:

I use pretty standard subscription code:

def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
        from confluent_kafka import Consumer
        self._consumer = Consumer({
            'bootstrap.servers': kafka_broker_list,
            'fetch.max.bytes': 50 * 1024 * 1024,  # 50MB
            'auto.offset.reset': 'earliest',
            'group.id': group_id,
            'enable.auto.commit': True
        })
        logging.info(f"Subscribing for topics: {topics}")
        self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)

The problem: Out of the 120 consumers which I start, only 84 (the same number as the number of partitions of the largest topic) get partition assignment - the others stay without any partition assignment and thus remain idle. What's worse, I usually get 5 consumers with ~ 10 assigned partitions, some with 8, a lot with 2-3-4, a also a lot of consumers with only a single partition assigned. I believe the "first" consumers to subscribe, get the most topics, until the available partitions for each topic are exhausted.

The questions:

  1. I read about the partition.assignment.strategy configuration property which is available to Java Consumers, however I couldn't find it in the Confluent Kafka Client. So is there a way to configure an assignment strategy in Confluent Kafka Python Client?
  2. Is there a way to set a partition assignment strategy on the server, or per topic or per group ID?
  3. Alternately Is there are different way to distribute the load between all consumers?

Thank you for taking the time to read my question :)


Solution

  • The confluent-kafka python client internally uses the librdkafka library which actually allows the configuration of an assignment strategy. Currently two assignment strategies as supported - "range" - the default one, and "roundrobin" which solves the problem I described.

    It is configured by adding the following configuration property to the consumer configuration:

    'partition.assignment.strategy': 'roundrobin',
    

    Documentation of all librdkafka properties is available here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md