Apache Beam KafkaIO has support for kafka consumers to read only from specified partitions. I have the following code.
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(List<TopicPartitions>)
I have the following 2 questions.
I found the answers myself.
How do I tell kafkaIO to read from particular partitions?
kafkaIO has the method withTopicPartitions(List<TopicPartitions>)
which accepts a list of TopicPartition
objects.
Topic Partitions are named as sequential numbers starting from zero. Hence, the following should work
KafkaIO.<String, String>read()
.withCreateTime(Duration.standardMinutes(1))
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.build())
.commitOffsetsInFinalize()
.withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0),new TopicPartition(topicName, 1),new TopicPartition(topicName, 2)))
To test it out, use kafkacat
and the following command
kafkacat -P -b localhost:9092 -t sample -p 0
- This command produces to specified partition.
Does Apache beam spawn the number of kafka consumers equal to the partition list mentioned during the creation of the kafka consumer?
It will spawn a single consumer group with the number of consumers as the number of partitions mentioned during the building of the kafka Producer object explicitly.