Iam trying to create a GCP dataflow in GCP cloud function. I have deployed a simple apache beam function which works fine but I get path error when I try to readavro file. And the same script runs when I run from my local with the parameter --runner as Dataflowrunner Some suggestion says I have to do pip install apache-beam[gcp]. I have already done it in my local and it is working fine. If I try to install it in GCP it goes for session time out after some time. Below is my code.
#import print library
# This script will read all avro files on a path and print them
import logging
import os
#import apache beam library
import apache_beam as beam
#import pipeline options.
from apache_beam.options.pipeline_options import PipelineOptions
#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)
PATH ='gs://mybucket_34545465/cloud_storage_transfer/'
class ComputeWordLengthFn(beam.DoFn):
def process(self, element):
print(element)
return [len(element)]
beam_options = PipelineOptions(
runner='DataflowRunner',
project='bigqueryproject-34545465',
job_name='testgcsaccessfromcloudfunction',
temp_location='gs://temp_34545465/temp',
region='us-central1')
def hello_pubsub(data, context):
p = beam.Pipeline(options=beam_options)
#create a PCollectionfromAVRO file
transactions = (p
| 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro(PATH + 'avrofile_*'))
word_lengths = transactions | beam.ParDo(ComputeWordLengthFn())
print(word_lengths)
# Run the pipeline
result = p.run()
# wait until pipeline processing is complete
result.wait_until_finish()
I get the following error
Traceback (most recent call last): File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 2073, in
wsgi_app response = self.full_dispatch_request() File
"/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1518, in
full_dispatch_request rv = self.handle_user_exception(e) File
"/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1516, in
full_dispatch_request rv = self.dispatch_request() File
"/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1502, in
dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])
(**req.view_args) File "/layers/google.python.pip/pip/lib/python3.8/site-
packages/functions_framework/__init__.py", line 171, in view_func function(data, context)
File "/workspace/main.py", line 46, in hello_pubsub | 'Read all from AVRO' >>
beam.io.avroio.ReadFromAvro(PATH + 'avrofile_*')) File
"/layers/google.python.pip/pip/lib/python3.8/site-packages/apache_beam/io/avroio.py",
line 145, in __init__ self._source = _create_avro_source( File
"/layers/google.python.pip/pip/lib/python3.8/site-packages/apache_beam/io/avroio.py",
line 285, in _create_avro_source _FastAvroSource( File
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filebasedsource.py", line 126, in __init__ self._validate() File
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/options/value_provider.py", line 193, in _f return fnc(self, *args,
**kwargs) File "/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filebasedsource.py", line 187, in _validate match_result =
FileSystems.match([pattern], limits=[1])[0] File
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filesystems.py", line 203, in match filesystem =
FileSystems.get_filesystem(patterns[0]) File
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filesystems.py", line 103, in get_filesystem raise ValueError(
ValueError: Unable to get filesystem from specified path, please use the correct path or
ensure the required dependency is installed, e.g., pip install apache-beam[gcp]. Path
specified: gs://mybucket_34545465/cloud_storage_transfer/avrofile_*
The approach of creating the script directly in cloud function is not the correct way of creating the dataflow. The solution worked for me is .
python Dataflow_script_V3.py --runner DataflowRunner --project project-XXXX --staging_location gs://mybucket/staging --temp_location gs://mybucket/temp --region us-central1 --template_location gs://mybucket/templates/templatename
# Args:
# event (dict): Event payload.
# context (google.cloud.functions.Context): Metadata for the event.
# """
def ProcessAvroFile(event, context):
from googleapiclient.discovery import build
import datetime
service = build('dataflow', 'v1b3')
# Set the following variables to your values.
current_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
job = 'processavro' + current_date
JOBNAME = job
PROJECT = 'project-XXXX'
BUCKET = 'mybucket'
TEMPLATE = 'gs://mybucket/templates/templatename'
# get the file imported
file = event
print(f"Processing file: {file['name']}.")
filename = "gs://myfilebucket/" +file['name']
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"parameters": {
"inputfile" : filename,
},
"environment": {
"tempLocation": "gs://{bucket}/dataflow/temp".format(bucket=BUCKET),
}
}
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=TEMPLATE, body=BODY)
response = request.execute()
return (response)