I am new to Apache Beam, Dataflow and Python and any help would be appreciated. I have a requirement where I need to generate reports by fetching records from BigQuery table and writing the results into GCS bucket using Apache Beam in python.I wrote the pipeline as below -
#Here I am converting the BigQuery output to 2 element tuple where elements are dictionaries for ex :
({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : 'abc@xxx.com','phone_number': '00012345'})
class convtotupleofdict(beam.DoFn):
def process(self,element):
return[( {'institiution_id' : element['institiution_id'] },
{'customer_id':element['customer_id'],
'customer_name' : element['customer_name'],
'customer_email' : element['customer_email'],
'phone_number' : element['phone_number']})]
with beam.Pipeline(options=pipeline_options) as p:
csv_ip= p | 'ReadfromBQ' >> beam.io.ReadFromBigQuery(query='SELECT institiution_id,customer_id,customer_name,customer_email,phone_number from <table name> where customer_status='Active' order by
institiution_id,customer_id', use_standard_sql=True) \
| 'ConvttoTupleofDict' >> beam.ParDo(convtotupleofdict()) \
| 'Groupbyinstitution_id' >> beam.GroupByKey() \
op_gcs= csv_ip
| 'WritetoGCS' >> beam.io.fileio.WriteToFiles(
path='gs://my-bucket/reports',
sink=lambda dest :beam.io.fileio.TextSink()
)
I am using GroupbyKey() transform to group the data based on institution_id so that I can split the data based on institution_id and create separate files for each institution_id in GCS bucket. The GroupbyKey() output is as follows-
({'institution_id' :'100'},{'customer_id' : '1000','customer_name': 'ABC','customer_email' : 'abc@xxx.com','phone_number': '00012345'},{'customer_id' : '2000','customer_name': 'XYZ','customer_email' : 'xyz@xxx.com','phone_number': '12378'})
({'institution_id' :'200'},{'customer_id' : '3000','customer_name': 'MNO','customer_email' : 'mno@xxx.com','phone_number': '789102'},{'customer_id' : '4000','customer_name': 'PQR','customer_email' : 'ttt@xxx.com','phone_number': '123789'})
Now,I am struggling to convert the GroupbyKey() output to csv file to upload to GCS bucket.I got to know about beam.io.fileio.WriteToFiles from https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html as this can be used for writing files to dynamic destinations. In order to split the data by institution_id,how should I provide the following parameters of WritetoFiles - path,destination,sink and file_naming. I understand destination and sink are callables,but I am not able to build it.I am kind of stuck at this point and not able to proceed.I am actually getting confused between the params destination and sink ,how should I write it to split the data based on institution_id and generate csv file ? For now,I am testing my code with DirectRunner.
I hope it can help.
I propose you a complete solution for your need.
Beam
main.py file :
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from your_root_folder.file_io_customer_by_institution_transform import \
FileIOCustomerByInstitutionTransform
def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
input = [
{
'institution_id': '100',
'customer_id': '1000',
'customer_name': 'ABC',
'customer_email': 'abc@xxx.com',
'phone_number': '00012345'
},
{
'institution_id': '100',
'customer_id': '1001',
'customer_name': 'ABCD',
'customer_email': 'abcd@xxx.com',
'phone_number': '00012346'
},
{
'institution_id': '101',
'customer_id': '1001',
'customer_name': 'ABCD',
'customer_email': 'abcd@xxx.com',
'phone_number': '00012346'
}
]
(
p
| beam.Create(input)
| "Group customers by institution" >> beam.GroupBy(lambda customers: customers['institution_id'])
| f"Write file to GCS" >> FileIOCustomerByInstitutionTransform('gs://mazlum_dev/dynamicfiles')
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
job_config
file :
class JobConfig:
FILE_ENCODING = 'utf-8'
FILE_NAME_TIMESTAMP_FORMAT = '%Y%m%d%H%M%S'
CSV_SEPARATOR = ','
file_io_customer_by_institution_transform.py
file :
from datetime import datetime
from typing import List, Dict, Tuple, Iterable
import apache_beam as beam
from apache_beam.io.fileio import TextSink, WriteToFiles
from pytz import timezone
from integration_ocd.pythonjobs.common.module.job_config import JobConfig
class InstitutionDestinationParamError(Exception):
pass
class FileIOCustomerByInstitutionTransform(beam.PTransform):
def __init__(self, output_path: str):
super().__init__()
self._output_path = output_path
def expand(self, pcoll):
return (
pcoll
| f"Write files to GCS" >>
WriteToFiles(
path=self._output_path,
destination=to_institution_destination,
file_naming=self.build_file_name,
sink=lambda institution_dest: CustomCsvSink())
)
def build_file_name(self, *args) -> str:
"""
Build the file name dynamically from parameters given by Beam in the 'writeToFile' PTransform
A destination is built with institution as value (key of group in the PCollection), then the file name
is built from this institution (5th argument)
"""
file_name_timestamp = datetime.now(timezone('Europe/Paris')).strftime(JobConfig.FILE_NAME_TIMESTAMP_FORMAT)
try:
institution_destination: str = args[5]
return f'CUSTOMER_INSTITUTION_{institution_destination}_{file_name_timestamp}.csv'
except Exception as err:
raise InstitutionDestinationParamError('The institution destination param must be passed', err)
class CustomCsvSink(TextSink):
def __init__(self):
super().__init__()
def write(self, customers_with_institution):
customers: Iterable[Dict[str, str]] = customers_with_institution[1]
for index, customer in enumerate(customers, start=1):
if index == 1:
header_field_names: bytes = self.build_csv_header_file(customer)
self._fh.write(header_field_names)
self._fh.write(self.get_csv_line_break())
customer_csv_entry = self.convert_dict_to_csv_record(customer)
self._fh.write(customer_csv_entry)
self._fh.write(self.get_csv_line_break())
def get_csv_line_break(self) -> bytes:
return '\n'.encode(JobConfig.FILE_ENCODING)
def build_csv_header_file(self, customer_dict: Dict[str, str]) -> bytes:
header_field_names: str = JobConfig.CSV_SEPARATOR.join(customer_dict.keys())
return header_field_names.encode(JobConfig.FILE_ENCODING)
def convert_dict_to_csv_record(self, customer_dict: Dict[str, str]) -> bytes:
"""
Turns dictionary values into a comma-separated value formatted string
The separator is added to a configuration file
"""
customer_csv_record: str = JobConfig.CSV_SEPARATOR.join(map(str, customer_dict.values()))
return customer_csv_record.encode(JobConfig.FILE_ENCODING)
def to_institution_destination(customers_with_institution: Tuple[str, List[Dict[str, str]]]) -> str:
"""
Map the given tuple to the institution as destination in 'WriteToFiles' PTransform.
Then this destination can be used in the 'file_name' part.
"""
return customers_with_institution[0]
Some explanations :
group by
with Beam
on institution_id
fieldPTransform
containing all the logic to write dynamic filesfile_io_customer_by_institution_transform.py
file :
JobConfig
objectCSVSink
is created to generate the CSV
lines from elements in the PCollection
filename
is generated by the current institution ID and the current timestampinstitution ID
In my example :
file 1
with institution 100 : CUSTOMER_INSTITUTION_100_20221011160828.csv
=> contains 2 CSV lines
file 2
with institution 101 : CUSTOMER_INSTITUTION_101_20221011160828.csv
=> contains 1 CSV line
To be honnest the documentation is not complete in Beam
Python
for this kind of use case and the use of WriteToFiles
.