javaspringapache-kafkaspring-batchspring-kafka

How do I setup KafkaItemReaderBuilder to read all partitions?


I currently use the partitions method with the list of partitions to be consumed :

partitions(0, 1, 2)

If a partition 3 is added, it will not consume it. How do I tell KafkaItemReaderBuilder I want to read all partitions?

I tried not using the partitions method at all, but it turned out it was mandatory.

Also, the javadoc did not mention how to do :

public KafkaItemReaderBuilder<K,V> partitions(Integer... partitions) A list of partitions to manually assign to the consumer. Parameters: partitions - list of partitions to assign to the consumer Returns: The current instance of the builder.


Solution

  • To consume all partitions dynamically with KafkaItemReaderBuilder, you'll need to manually retrieve the list of partitions using Kafka's AdminClient and then pass that list to the partitions method. This approach allows your application to adapt if the number of partitions changes.

    Here's a simple way to fetch partitions and use them:

    List<Integer> partitions = adminClient.describeTopics(List.of("your_topic"))
        .all().get().get("your_topic").partitions().stream()
        .map(TopicPartitionInfo::partition)
        .collect(Collectors.toList());
    
    KafkaItemReaderBuilder<String, String> readerBuilder = new KafkaItemReaderBuilder<String, String>()
        .partitions(partitions.toArray(new Integer[0]))
        .topic("your_topic").build();
    

    If new partitions are added, you would need to rerun this logic to include them in the reader.