I have a pipeline that load PubSub message from a topic into a BigQuery table. In order to process less data, I want to store message based on the "customer_id" field into a BigQuery table called table-{customer_id}. However, I am struggling a lot to find out how to do it correctly.
Below, you will find my pipeline that is working but does not write into several tables.
public class PubsubAvroToBigQuery {
public interface Options extends PipelineOptions, StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description("BigQuery Table")
@Required
ValueProvider<String> getBigQueryTable();
void setBigQueryTable(ValueProvider<String> value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
try {
run(options);
} catch (IOException e) {
e.printStackTrace();
}
}
public static PipelineResult run(Options options) throws IOException {
// Create the pipeline
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
// load avro schema from classpath
Schema avroSchema = ApiCalls.SCHEMA$;
pipeline
.apply(
"Read PubSub Avro Message",
PubsubIO.readAvroGenericRecords(avroSchema).fromTopic(options.getInputTopic())
)
.apply("Write to BigQuery", BigQueryIO.<GenericRecord>write()
.to(options.getBigQueryTable())
.useBeamSchema()
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.optimizedWrites());
return pipeline.run();
}
}
I did lot of test based on TupleTag or even Windows (with data grouped) but it didn't work... Hope you can help me Thank you
To achieve your Use Case, you could use the DynamicDestinations class to chose the TableDestination to use according to the input event.
You can use it, as stated in the Javadoc like this:
events.apply(BigQueryIO.<UserEvent>write()
.to(new DynamicDestinations<UserEvent, String>() {
public String getDestination(ValueInSingleWindow<UserEvent> element) {
// here read your input and extract the customer_id
return element.getValue().getUserId();
}
public TableDestination getTable(String user) {
// here build the destination table based on what is output by getDestination method
return new TableDestination(tableForUser(user), "Table for user " + user);
}
public TableSchema getSchema(String user) {
return tableSchemaForUser(user);
}
})
.withFormatFunction(new SerializableFunction<UserEvent, TableRow>() {
public TableRow apply(UserEvent event) {
return convertUserEventToTableRow(event);
}
}));
Note that you can create a class that implements DynamicDestinations with a constructor to pass parameters that could then be used in the implemented methods.