google-cloud-platformgoogle-cloud-functionsapache-beamapache-beam-io

apache beam with gcp cloud function


Iam trying to create a GCP dataflow in GCP cloud function. I have deployed a simple apache beam function which works fine but I get path error when I try to readavro file. And the same script runs when I run from my local with the parameter --runner as Dataflowrunner Some suggestion says I have to do pip install apache-beam[gcp]. I have already done it in my local and it is working fine. If I try to install it in GCP it goes for session time out after some time. Below is my code.


#import print library
# This script will read all avro files on a path and print them 
import logging
import os
#import apache beam library
import apache_beam as beam

#import pipeline options.
from apache_beam.options.pipeline_options import  PipelineOptions

#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)

PATH ='gs://mybucket_34545465/cloud_storage_transfer/'
class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):    
    print(element)   
    return [len(element)]

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='bigqueryproject-34545465',
    job_name='testgcsaccessfromcloudfunction',
    temp_location='gs://temp_34545465/temp',
    region='us-central1')

def hello_pubsub(data, context):
  p = beam.Pipeline(options=beam_options)
  #create a PCollectionfromAVRO file
  transactions = (p        
            | 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro(PATH + 'avrofile_*'))
  word_lengths = transactions | beam.ParDo(ComputeWordLengthFn())

  print(word_lengths)

  # Run the pipeline
  result = p.run()
  #  wait until pipeline processing is complete
  result.wait_until_finish()

I get the following error

Traceback (most recent call last): File "/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 2073, in 
wsgi_app response = self.full_dispatch_request() File 
"/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1518, in 
full_dispatch_request rv = self.handle_user_exception(e) File 
"/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1516, in 
full_dispatch_request rv = self.dispatch_request() File 
"/layers/google.python.pip/pip/lib/python3.8/site-packages/flask/app.py", line 1502, in 
dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])
(**req.view_args) File "/layers/google.python.pip/pip/lib/python3.8/site-
packages/functions_framework/__init__.py", line 171, in view_func function(data, context)
 File "/workspace/main.py", line 46, in hello_pubsub | 'Read all from AVRO' >> 
beam.io.avroio.ReadFromAvro(PATH + 'avrofile_*')) File 
"/layers/google.python.pip/pip/lib/python3.8/site-packages/apache_beam/io/avroio.py", 
line 145, in __init__ self._source = _create_avro_source( File 
"/layers/google.python.pip/pip/lib/python3.8/site-packages/apache_beam/io/avroio.py", 
line 285, in _create_avro_source _FastAvroSource( File 
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filebasedsource.py", line 126, in __init__ self._validate() File 
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/options/value_provider.py", line 193, in _f return fnc(self, *args, 
**kwargs) File "/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filebasedsource.py", line 187, in _validate match_result = 
FileSystems.match([pattern], limits=[1])[0] File 
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filesystems.py", line 203, in match filesystem = 
FileSystems.get_filesystem(patterns[0]) File 
"/layers/google.python.pip/pip/lib/python3.8/site-
packages/apache_beam/io/filesystems.py", line 103, in get_filesystem raise ValueError( 
ValueError: Unable to get filesystem from specified path, please use the correct path or 
ensure the required dependency is installed, e.g., pip install apache-beam[gcp]. Path 
specified: gs://mybucket_34545465/cloud_storage_transfer/avrofile_*




Solution

  • The approach of creating the script directly in cloud function is not the correct way of creating the dataflow. The solution worked for me is .

    1. Create a dataflow script locally.
    2. Deploy it as a template example

    python Dataflow_script_V3.py --runner DataflowRunner --project project-XXXX --staging_location gs://mybucket/staging --temp_location gs://mybucket/temp --region us-central1 --template_location gs://mybucket/templates/templatename

    1. Create a cloud function
        # Args:
        #      event (dict): Event payload.
        #      context (google.cloud.functions.Context): Metadata for the event.
        # """
    def ProcessAvroFile(event, context):
        
         from googleapiclient.discovery import build
         import datetime
         service = build('dataflow', 'v1b3')
    
         # Set the following variables to your values.
         current_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
         job = 'processavro' + current_date
         JOBNAME = job
         PROJECT = 'project-XXXX'
         BUCKET = 'mybucket'
         TEMPLATE = 'gs://mybucket/templates/templatename'
    
         # get the file imported
         file = event
         print(f"Processing file: {file['name']}.")
         filename = "gs://myfilebucket/" +file['name']
         BODY = {
          "jobName": "{jobname}".format(jobname=JOBNAME),
          "parameters": {          
              "inputfile" : filename,
          },
          "environment": {
              "tempLocation": "gs://{bucket}/dataflow/temp".format(bucket=BUCKET),
          }
      }
    
         request = service.projects().templates().launch(projectId=PROJECT, gcsPath=TEMPLATE, body=BODY)
         response = request.execute()
    
         return (response)
    
    1. Set a notification trigger in the cloud storage for mybucket to trigger notification gcloud functions deploy ProcessAvroFile
      --trigger-bucket=myfilebucket