In order to process Avro-encoded messages with Apache Beam using KafkaIO
, one needs to pass an instance of ConfluentSchemaRegistryDeserializerProvider
as the value deserializer.
A typical example looks like this:
PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("kafka-broker:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))
However, some of the Kafka topics, that I want to consume, have multiple different subjects (event types) on them (for ordering reasons). Thus, I can't provide one fixed subject name in advance. How can this dilemma be solved?
(My goal is to, in the end, use BigQueryIO
to push these events to the cloud.)
You could do multiple reads, one per subject, and then Flatten them.