I have a process that I am trying to automate. I already do dynamic table destination in WriteToBigQuery class but I want to do the same for the write_disposition parameter.
Is it possible to do this ?
My (not working) code :
#Parser + Coder
parser = argparse.ArgumentParser()
coder = CustomCoder()
#Parsing args
_, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions()
dynamics_args = pipeline_options.view_as(DynamicArgs)
#Pipeline init
p1 = beam.Pipeline(argv=pipeline_args, options = pipeline_options)
#Processing
lines = p1 | "Reading file" >> beam.io.ReadFromText(dynamics_args.dataLocation, skip_header_lines=1, strip_trailing_newlines=True, coder = coder) | \
"Cleaning file" >> beam.ParDo(CleanFile()) | \
"Parsing file" >> beam.ParDo(ParseCSV(dynamics_args.sep)) | \
"Mapping schema" >> beam.ParDo(MappingSchema(dynamics_args.destinationSchema)) | \
"Cleaning fields" >> beam.ParDo(CleanFields()) | \
"Converting numeric" >> beam.ParDo(ConvertNumericToBqFormat(dynamics_args.destinationSchema)) | \
"Converting dates" >> beam.ParDo(ConvertDatesToBqFormat(dynamics_args.destinationSchema)) | \
"Adding partition field" >> beam.ParDo(AddPartitionField()) | \
"Fill null values" >> beam.ParDo(FillNullValues())
writeDisposition = p1 | "Get write disposition" >> beam.Create([dynamics_args.writeDisposition])
writeDisposition = beam.pvalue.AsSingleton(writeDisposition)
lines | "Writing data" >> beam.io.WriteToBigQuery(
dynamics_args.destinationTable, schema= lambda x : castSchema(dynamics_args.destinationSchema.get()),
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = dynamics_args.writeDisposition,
additional_bq_parameters = {'timePartitioning': {'type': 'DAY', 'field': 'periode'}})
p1.run()
Your code looks correct. Whatever write disposition you specify on the command line when you invoke this code should be the write disposition used by the pipeline.