javaapache-kafkaapache-beamavroapache-beam-kafkaio

Send Big query table rows to Kafka avro message using apache beam


I need to publish the Big query table rows to Kafka in Avro format.

PCollection<TableRow> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table))
    
//How to convert rows to avro format?

rows.apply(KafkaIO.<Long, ???>write()
                .withBootstrapServers("kafka:29092")
                .withTopic("test")
                .withValueSerializer(KafkaAvorSerializer.class)
        );

How to convert TableRow to Avro format?


Solution

  • Use MapElements

    rows.apply(MapElements.via(new SimpleFunction<Tabelrows, GenericRecord>() {
      @Override
      public GenericRecord apply(Tabelrows input) {
        log.info("Parsing {} to Avro", input);
        return null; // TODO: Replace with Avro object
      }
    });
    

    If Tabelrows is a collection-type that you want to convert to many records, you can use FlatMapElements instead.

    As for writing to Kafka, I wrote a simple example