javaapache-flinkdata-stream

How to select kafka topic dynamically in apache flink kafka sink?


I'm using KafkaSink as the sink in my flink application and I require to send stringifiedJSONs to different Kafka topics based on some key-value pairs (for example, a few JSONs go to topic1 and a few other sinks to another topic, topic2 and so on). But I didn't find any way in documentation to configure the Kafka topic to be chosen based on incoming data stream. Can someone please help me with this?

NOTE: I'm using flink version 14.3

    DataStream<String> data = .....
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers(parameter.get("bootstrap.servers"))
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(parameter.get("kafka.output.topic"))
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    data.sinkTo(sink);

Solution

  • I haven't tried this, but I believe that rather than using setTopic to hardwire the sink to a specific topic, you can instead implement the serialize method on a custom KafkaRecordSerializationSchema so that each ProducerRecord it returns specifies the topic it should be written to.

    Another option would be to create a separate sink object for every topic, and then use a ProcessFunction that fans out to set of side outputs, each connected to the appropriate sink.