pythonapache-beamdataflow

Writing to Multiple Files from Single PCollection (Beam-Python)


I am working with multiple file inputs and would like to feed them into a Dataflow pipeline. However, I would like to preserve the outputs to match the number of inputs. Let's pretend we have three different files:

 gcs_files = ['gs://bucket/<file1_dir>', 'gs://bucket/<file2_dir>', 'gs://bucket/<file3_dir>']

And we want to feed these files into a simple Read->Write pipeline, as follows:

 import apache_beam as beam
 from apache_beam.io import ReadAllFromText
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 
 #Please assume I am using default pipeline options
 print_files = (p | beam.Create(gcs_files) | ReadAllFromText() | WriteToText(<output_path>, shard_name_template = '', file_name_suffix = '.json')

The purpose of this pipeline is simple, but I am working on a transforming the format of the input files. Hence, we expect that for every input file there is a corresponding transformed file. The problem with the current setup is that the output is all one file. Is there any way I can preserve the integrity of the original files?


Solution

  • You can ReadAllFromText(with_filename=True), then it gives you key-value pairs of elements where the key is the original filename. However, then you have to use WriteToFiles to dynamically write to destinations based on the original file name after your transforms to the values.

    Or if the size of your gcs_files is small, you can do everything in construction time:

    pcolls = {}
    for gcs_file in gcs_files:
        pcolls[gcs_file] = (p 
            | f'Create {gcs_file}' >> beam.Create([gcs_file]) 
            | f'Read {gcs_file}' >> ReadAllFromText()
            | f'Your Transform {gcs_file}' >> YourTransform()
            | f'Write {gcs_file}' >> WriteToText())