javaapache-flinkapache-beamapache-beam-kafkaio

Apache Beam KafkaIO - Write to Multiple Topics


Currently, I'm working on Apache Beam Pipeline implementation which consumes data from three different Kafka topics, and after some processing, I create three types of objects adding those data taken from the above-mentioned Kafka topics. Finally, it is required to publish those three objects into three different Kafka topics. It is possible to read from multiple topics using withTopics method in KafkaIO.read but I did not find a KafkaIO feature to write to multiple topics.

I would like to get some advice on how to do this in the most ideal way, appreciate it if anyone can provide some code examples.


Solution

  • You can do that with 3 different sinks on a PCollection, example :

        private transient TestPipeline pipeline = TestPipeline.create();
    
        @Test
        public void kafkaIOSinksTest(){
            PCollection<String> inputCollection = pipeline.apply(Create.of(Arrays.asList("Object 1", "Object 2")));
            
            inputCollection.apply(KafkaIO.<Void, String>write()
                    .withBootstrapServers("broker_1:9092,broker_2:9092")
                    .withTopic("topic1")
                    .withValueSerializer(new StringSerializer())
                    .values());
    
            inputCollection.apply(KafkaIO.<Void, String>write()
                    .withBootstrapServers("broker_1:9092,broker_2:9092")
                    .withTopic("topic2")
                    .withValueSerializer(new StringSerializer())
                    .values());
    
            inputCollection.apply(KafkaIO.<Void, String>write()
                    .withBootstrapServers("broker_1:9092,broker_2:9092")
                    .withTopic("topic3")
                    .withValueSerializer(new StringSerializer())
                    .values());
        }
    

    In this example, the same PCollection is sinked in 3 different topics, via multi sinks.