Setup:
I use pretty standard subscription code:
def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
from confluent_kafka import Consumer
self._consumer = Consumer({
'bootstrap.servers': kafka_broker_list,
'fetch.max.bytes': 50 * 1024 * 1024, # 50MB
'auto.offset.reset': 'earliest',
'group.id': group_id,
'enable.auto.commit': True
})
logging.info(f"Subscribing for topics: {topics}")
self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)
The problem: Out of the 120 consumers which I start, only 84 (the same number as the number of partitions of the largest topic) get partition assignment - the others stay without any partition assignment and thus remain idle. What's worse, I usually get 5 consumers with ~ 10 assigned partitions, some with 8, a lot with 2-3-4, a also a lot of consumers with only a single partition assigned. I believe the "first" consumers to subscribe, get the most topics, until the available partitions for each topic are exhausted.
The questions:
partition.assignment.strategy
configuration property which is available to Java Consumers, however I couldn't find it in the Confluent Kafka Client. So is there a way to configure an assignment strategy in Confluent Kafka Python Client?Thank you for taking the time to read my question :)
The confluent-kafka python client internally uses the librdkafka library which actually allows the configuration of an assignment strategy. Currently two assignment strategies as supported - "range" - the default one, and "roundrobin" which solves the problem I described.
It is configured by adding the following configuration property to the consumer configuration:
'partition.assignment.strategy': 'roundrobin',
Documentation of all librdkafka properties is available here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md