pythongisgoogle-cloud-dataflowapache-beamapache-beam-io

Reading GPKG format file over Apache Beam


I have a requirement to parse and load gpgk extension file to Bigquery table through apache beam (Dataflow runner). I could see that beam has feature called Geobeam, but i couldn't see reference for loading of gpgk files.

Q1: Which Beam library can help me to load geopakage file? Q2: As an alternate solution i am trying to read geopakage file as Binary file and over ParDo can transform it and get it loaded. How we can read Binary file over Apache beam?

Does any one has experience over the same and share experience.

Update: Alternate solution I have a requirement to read Binary Coded file through Python Apache beam (Dataflow as a runner).

I am trying to replicate following example Reading from a new Source over my code to read Binary files.

My code looks is given below, can you help me where its going wrong:-

#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import iobase

#------------Set up BQ parameters-----------------------#
project = 'proj-dev'
dataset_id = 'sandbox'
table_schema_Audit = ('id:STRING,name:STRING')
input = 'gs://bucket/beers.csv'
BUCKET = 'bucket'

#-------------Splitting Of Records----------------------#
class Transaction(iobase.BoundedSource):
    def process(self):
        # Open the Shapefile
        import fiona
        with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
            parsed_data = [[{"id": json.loads(json.dumps(feature['properties']))['Id'],
                        "name": json.loads(json.dumps(feature['properties']))['Name']}] for feature in input_file]
        return parsed_data

def run(argv=None, save_main_session=True):
    pipeline_args = [
      '--project={0}'.format(project),
      '--job_name=loadstructu',
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--region=us-yyyy1',
      '--runner=DataflowRunner',
      '--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet'
    ]

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)

    data_loading = (
        p1
        | 'ReadData' >> beam.io.ReadFromText(Transaction())   
        )

#---------------------Type = load----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Write-Audit' >> beam.io.WriteToBigQuery(
                                                    table='structdata',
                                                    dataset=dataset_id,
                                                    project=project,
                                                    schema=table_schema_Audit,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                                    ))

    result = p1.run()
    result.wait_until_finish()

if __name__ == '__main__':
  run()

It popping error as given below:-

~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
    772         skip_header_lines=skip_header_lines,
    773         delimiter=delimiter,
--> 774         escapechar=escapechar)
    775 
    776   def expand(self, pvalue):

~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
    133         min_bundle_size,
    134         compression_type=compression_type,
--> 135         validate=validate)
    136 
    137     self._strip_trailing_newlines = strip_trailing_newlines

~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
    110           '%s: file_pattern must be of type string'
    111           ' or ValueProvider; got %r instead' %
--> 112           (self.__class__.__name__, file_pattern))
    113 
    114     if isinstance(file_pattern, str):

TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got <__main__.Transaction object at 0x7fcc79ffc250> instead

Solution

  • solution which worked for me is by adding following 2 additional functions before running beam pipelines :-

    #-------------convert binary file to a txt file----------------------#
    def file_create():
            # Open the Shapefile
            import fiona, json
            f = open("data.txt", "a")
            with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
                for feature in input_file:
                    row = json.loads(json.dumps(feature['properties']))['Id'] + ';' + json.loads(json.dumps(feature['properties']))['Name'] + ';' + str(shape(json.loads(json.dumps(feature['geometry'])))) +'\n'
                    f.write(row)
            f.close()
    
    #---------------Upload newly created file to GCS-------------------------
    def upload():
        client = storage.Client()
        bucket = client.bucket('bucket')
        blob = bucket.blob('data.txt')
        blob.upload_from_filename('data.txt')