Currently, I'm working on Apache Beam Pipeline implementation which consumes data from three different Kafka topics, and after some processing, I create three types of objects adding those data taken from the above-mentioned Kafka topics. Finally, it is required to publish those three objects into three different Kafka topics.
It is possible to read from multiple topics using withTopics
method in KafkaIO.read
but I did not find a KafkaIO feature to write to multiple topics.
I would like to get some advice on how to do this in the most ideal way, appreciate it if anyone can provide some code examples.
You can do that with 3 different sinks on a PCollection
, example :
private transient TestPipeline pipeline = TestPipeline.create();
@Test
public void kafkaIOSinksTest(){
PCollection<String> inputCollection = pipeline.apply(Create.of(Arrays.asList("Object 1", "Object 2")));
inputCollection.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("topic1")
.withValueSerializer(new StringSerializer())
.values());
inputCollection.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("topic2")
.withValueSerializer(new StringSerializer())
.values());
inputCollection.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("topic3")
.withValueSerializer(new StringSerializer())
.values());
}
In this example, the same PCollection
is sinked in 3 different topics, via multi sinks.