I need to publish the Big query table rows to Kafka in Avro format.
PCollection<TableRow> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table))
//How to convert rows to avro format?
rows.apply(KafkaIO.<Long, ???>write()
.withBootstrapServers("kafka:29092")
.withTopic("test")
.withValueSerializer(KafkaAvorSerializer.class)
);
How to convert TableRow to Avro format?
Use MapElements
rows.apply(MapElements.via(new SimpleFunction<Tabelrows, GenericRecord>() {
@Override
public GenericRecord apply(Tabelrows input) {
log.info("Parsing {} to Avro", input);
return null; // TODO: Replace with Avro object
}
});
If Tabelrows
is a collection-type that you want to convert to many records, you can use FlatMapElements
instead.
As for writing to Kafka, I wrote a simple example