apache-beamapache-beam-ioapache-beam-kafkaio

Apache Beam KafkaIO mention topic partition instead of topic name


Apache Beam KafkaIO has support for kafka consumers to read only from specified partitions. I have the following code.

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .commitOffsetsInFinalize()
                .withTopicPartitions(List<TopicPartitions>)

I have the following 2 questions.

  1. How do I get the partition names from kafka? How do I mention it in kafkaIO?
  2. Does Apache beam spawn the number of kafka consumers equal to the partition list mentioned during the creation of the kafka consumer?

Solution

  • I found the answers myself.

    How do I tell kafkaIO to read from particular partitions?

    kafkaIO has the method withTopicPartitions(List<TopicPartitions>) which accepts a list of TopicPartition objects.

    Topic Partitions are named as sequential numbers starting from zero. Hence, the following should work

    KafkaIO.<String, String>read()
                    .withCreateTime(Duration.standardMinutes(1))
                    .withReadCommitted()
                    .withBootstrapServers(endPoint)
                    .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                            .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                            .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                            .build())
                    .commitOffsetsInFinalize()
                    .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))
    

    To test it out, use kafkacat and the following command

    kafkacat -P -b localhost:9092 -t sample -p 0 - This command produces to specified partition.

    Does Apache beam spawn the number of kafka consumers equal to the partition list mentioned during the creation of the kafka consumer?

    It will spawn a single consumer group with the number of consumers as the number of partitions mentioned during the building of the kafka Producer object explicitly.