I have a requirement to parse and load gpgk
extension file to Bigquery table through apache beam (Dataflow runner). I could see that beam has feature called Geobeam, but i couldn't see reference for loading of gpgk
files.
Q1: Which Beam library can help me to load geopakage
file?
Q2: As an alternate solution i am trying to read geopakage
file as Binary file and over ParDo
can transform it and get it loaded. How we can read Binary
file over Apache beam?
Does any one has experience over the same and share experience.
Update: Alternate solution I have a requirement to read Binary Coded file through Python Apache beam (Dataflow as a runner).
I am trying to replicate following example Reading from a new Source
over my code to read Binary files.
My code looks is given below, can you help me where its going wrong:-
#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import iobase
#------------Set up BQ parameters-----------------------#
project = 'proj-dev'
dataset_id = 'sandbox'
table_schema_Audit = ('id:STRING,name:STRING')
input = 'gs://bucket/beers.csv'
BUCKET = 'bucket'
#-------------Splitting Of Records----------------------#
class Transaction(iobase.BoundedSource):
def process(self):
# Open the Shapefile
import fiona
with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
parsed_data = [[{"id": json.loads(json.dumps(feature['properties']))['Id'],
"name": json.loads(json.dumps(feature['properties']))['Name']}] for feature in input_file]
return parsed_data
def run(argv=None, save_main_session=True):
pipeline_args = [
'--project={0}'.format(project),
'--job_name=loadstructu',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=us-yyyy1',
'--runner=DataflowRunner',
'--subnetwork=https://www.googleapis.com/compute/v1/projects/proj-dev/regions/us-yyyy1/subnetworks/xxxxxx-dev-subnet'
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p1 = beam.Pipeline(options=pipeline_options)
data_loading = (
p1
| 'ReadData' >> beam.io.ReadFromText(Transaction())
)
#---------------------Type = load----------------------------------------------------------------------------------------------------------------------
result = (
data_loading
| 'Write-Audit' >> beam.io.WriteToBigQuery(
table='structdata',
dataset=dataset_id,
project=project,
schema=table_schema_Audit,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))
result = p1.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
It popping error as given below:-
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate, skip_header_lines, delimiter, escapechar, **kwargs)
772 skip_header_lines=skip_header_lines,
773 delimiter=delimiter,
--> 774 escapechar=escapechar)
775
776 def expand(self, pvalue):
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/textio.py in __init__(self, file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, buffer_size, validate, skip_header_lines, header_processor_fns, delimiter, escapechar)
133 min_bundle_size,
134 compression_type=compression_type,
--> 135 validate=validate)
136
137 self._strip_trailing_newlines = strip_trailing_newlines
~/apache-beam-2.43.0/packages/beam/sdks/python/apache_beam/io/filebasedsource.py in __init__(self, file_pattern, min_bundle_size, compression_type, splittable, validate)
110 '%s: file_pattern must be of type string'
111 ' or ValueProvider; got %r instead' %
--> 112 (self.__class__.__name__, file_pattern))
113
114 if isinstance(file_pattern, str):
TypeError: _TextSource: file_pattern must be of type string or ValueProvider; got <__main__.Transaction object at 0x7fcc79ffc250> instead
solution which worked for me is by adding following 2 additional functions before running beam pipelines :-
#-------------convert binary file to a txt file----------------------#
def file_create():
# Open the Shapefile
import fiona, json
f = open("data.txt", "a")
with fiona.open('gs://bucket/2022_data.gpkg', 'r') as input_file:
for feature in input_file:
row = json.loads(json.dumps(feature['properties']))['Id'] + ';' + json.loads(json.dumps(feature['properties']))['Name'] + ';' + str(shape(json.loads(json.dumps(feature['geometry'])))) +'\n'
f.write(row)
f.close()
#---------------Upload newly created file to GCS-------------------------
def upload():
client = storage.Client()
bucket = client.bucket('bucket')
blob = bucket.blob('data.txt')
blob.upload_from_filename('data.txt')