pythongoogle-cloud-platformgoogle-cloud-dataflowbeam

How to split the result of GroupByKey() transform based on key and write values into GCS bucket using Apache Beam python?


I am new to Apache Beam, Dataflow and Python and any help would be appreciated. I have a requirement where I need to generate reports by fetching records from BigQuery table and writing the results into GCS bucket using Apache Beam in python.I wrote the pipeline as below -

#Here I am converting the BigQuery output to 2 element tuple where elements are dictionaries for ex : 
({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : 'abc@xxx.com','phone_number': '00012345'})

class convtotupleofdict(beam.DoFn):
     def process(self,element):
         return[( {'institiution_id' : element['institiution_id'] },
                {'customer_id':element['customer_id'],
                  'customer_name' :  element['customer_name'],
                  'customer_email' : element['customer_email'],
                  'phone_number' : element['phone_number']})]

with beam.Pipeline(options=pipeline_options) as p:
   csv_ip=    p | 'ReadfromBQ' >> beam.io.ReadFromBigQuery(query='SELECT institiution_id,customer_id,customer_name,customer_email,phone_number from <table name> where customer_status='Active' order by
    institiution_id,customer_id', use_standard_sql=True) \
                 | 'ConvttoTupleofDict' >> beam.ParDo(convtotupleofdict()) \
                 | 'Groupbyinstitution_id' >> beam.GroupByKey() \
    
    op_gcs= csv_ip 
                   | 'WritetoGCS' >> beam.io.fileio.WriteToFiles(
                                  path='gs://my-bucket/reports',
                                  sink=lambda dest :beam.io.fileio.TextSink()
                                  
                                  )

I am using GroupbyKey() transform to group the data based on institution_id so that I can split the data based on institution_id and create separate files for each institution_id in GCS bucket. The GroupbyKey() output is as follows-

({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : 'abc@xxx.com','phone_number': '00012345'},{'customer_id' : '2000','customer_name': 'XYZ','customer_email' : 'xyz@xxx.com','phone_number': '12378'})
({'institution_id' :'200'},{'customer_id' : '3000','customer_name': 'MNO','customer_email' : 'mno@xxx.com','phone_number': '789102'},{'customer_id' : '4000','customer_name': 'PQR','customer_email' : 'ttt@xxx.com','phone_number': '123789'})

Now,I am struggling to convert the GroupbyKey() output to csv file to upload to GCS bucket.I got to know about beam.io.fileio.WriteToFiles from https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html as this can be used for writing files to dynamic destinations. In order to split the data by institution_id,how should I provide the following parameters of WritetoFiles - path,destination,sink and file_naming. I understand destination and sink are callables,but I am not able to build it.I am kind of stuck at this point and not able to proceed.I am actually getting confused between the params destination and sink ,how should I write it to split the data based on institution_id and generate csv file ? For now,I am testing my code with DirectRunner.


Solution

  • I hope it can help.

    I propose you a complete solution for your need.

    Beam main.py file :

    import logging
    
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    
    from your_root_folder.file_io_customer_by_institution_transform import \
        FileIOCustomerByInstitutionTransform
    
    
    def run():
        pipeline_options = PipelineOptions()
    
        with beam.Pipeline(options=pipeline_options) as p:
            input = [
                {
                    'institution_id': '100',
                    'customer_id': '1000',
                    'customer_name': 'ABC',
                    'customer_email': 'abc@xxx.com',
                    'phone_number': '00012345'
                },
                {
                    'institution_id': '100',
                    'customer_id': '1001',
                    'customer_name': 'ABCD',
                    'customer_email': 'abcd@xxx.com',
                    'phone_number': '00012346'
                },
                {
                    'institution_id': '101',
                    'customer_id': '1001',
                    'customer_name': 'ABCD',
                    'customer_email': 'abcd@xxx.com',
                    'phone_number': '00012346'
                }
            ]
    
            (
                    p
                    | beam.Create(input)
                    | "Group customers by institution" >> beam.GroupBy(lambda customers: customers['institution_id'])
                    | f"Write file to GCS" >> FileIOCustomerByInstitutionTransform('gs://mazlum_dev/dynamicfiles')
            )
    
    
    if __name__ == "__main__":
        logging.getLogger().setLevel(logging.INFO)
        run()
    

    job_config file :

    class JobConfig:
        FILE_ENCODING = 'utf-8'
        FILE_NAME_TIMESTAMP_FORMAT = '%Y%m%d%H%M%S'
        CSV_SEPARATOR = ','
    

    file_io_customer_by_institution_transform.py file :

    from datetime import datetime
    from typing import List, Dict, Tuple, Iterable
    
    import apache_beam as beam
    from apache_beam.io.fileio import TextSink, WriteToFiles
    from pytz import timezone
    
    from integration_ocd.pythonjobs.common.module.job_config import JobConfig
    
    
    class InstitutionDestinationParamError(Exception):
        pass
    
    
    class FileIOCustomerByInstitutionTransform(beam.PTransform):
    
        def __init__(self, output_path: str):
            super().__init__()
            self._output_path = output_path
    
        def expand(self, pcoll):
            return (
                    pcoll
                    | f"Write files to GCS" >>
                    WriteToFiles(
                        path=self._output_path,
                        destination=to_institution_destination,
                        file_naming=self.build_file_name,
                        sink=lambda institution_dest: CustomCsvSink())
            )
    
        def build_file_name(self, *args) -> str:
            """
            Build the file name dynamically from parameters given by Beam in the 'writeToFile' PTransform
            A destination is built with institution as value (key of group in the PCollection), then the file name
            is built from this institution (5th argument)
            """
            file_name_timestamp = datetime.now(timezone('Europe/Paris')).strftime(JobConfig.FILE_NAME_TIMESTAMP_FORMAT)
            try:
                institution_destination: str = args[5]
    
                return f'CUSTOMER_INSTITUTION_{institution_destination}_{file_name_timestamp}.csv'
            except Exception as err:
                raise InstitutionDestinationParamError('The institution destination param must be passed', err)
    
    
    class CustomCsvSink(TextSink):
    
        def __init__(self):
            super().__init__()
    
        def write(self, customers_with_institution):
            customers: Iterable[Dict[str, str]] = customers_with_institution[1]
    
            for index, customer in enumerate(customers, start=1):
                if index == 1:
                    header_field_names: bytes = self.build_csv_header_file(customer)
                    self._fh.write(header_field_names)
                    self._fh.write(self.get_csv_line_break())
    
                customer_csv_entry = self.convert_dict_to_csv_record(customer)
    
                self._fh.write(customer_csv_entry)
                self._fh.write(self.get_csv_line_break())
    
        def get_csv_line_break(self) -> bytes:
            return '\n'.encode(JobConfig.FILE_ENCODING)
    
        def build_csv_header_file(self, customer_dict: Dict[str, str]) -> bytes:
            header_field_names: str = JobConfig.CSV_SEPARATOR.join(customer_dict.keys())
    
            return header_field_names.encode(JobConfig.FILE_ENCODING)
    
        def convert_dict_to_csv_record(self, customer_dict: Dict[str, str]) -> bytes:
            """
            Turns dictionary values into a comma-separated value formatted string
            The separator is added to a configuration file
            """
            customer_csv_record: str = JobConfig.CSV_SEPARATOR.join(map(str, customer_dict.values()))
    
            return customer_csv_record.encode(JobConfig.FILE_ENCODING)
    
    
    def to_institution_destination(customers_with_institution: Tuple[str, List[Dict[str, str]]]) -> str:
        """
        Map the given tuple to the institution as destination in 'WriteToFiles' PTransform.
        Then this destination can be used in the 'file_name' part.
        """
        return customers_with_institution[0]
    

    Some explanations :

    file_io_customer_by_institution_transform.py file :

    In my example :

    To be honnest the documentation is not complete in Beam Python for this kind of use case and the use of WriteToFiles.