I've been trying to deploy a pipeline on Google Cloud Dataflow. It's been a quite a challenge so far. I'm facing an import issue because I realised that ParDo functions require the requirements.txt to be present if not it will say that it can't find the required module. https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
So I tried fixing the problem by passing in the requirements.txt file, only to be met with a very incomprehensible error message.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from google.cloud.bigtable.row import DirectRow
import datetime
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://tunnel-insight-2-0-dev-291100/dataflow'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location
ib.options.recording_duration = '1m'
...
...
pipeline_result = DataflowRunner().run_pipeline(p, options=options)
I've tried to pass requirements using "options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt""
I get this error
---------------------------------------------------------------------------
CalledProcessError Traceback (most recent call last)
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
90 try:
---> 91 out = subprocess.check_output(*args, **kwargs)
92 except OSError:
/opt/conda/lib/python3.7/subprocess.py in check_output(timeout, *popenargs, **kwargs)
410 return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
--> 411 **kwargs).stdout
412
/opt/conda/lib/python3.7/subprocess.py in run(input, capture_output, timeout, check, *popenargs, **kwargs)
511 raise CalledProcessError(retcode, process.args,
--> 512 output=stdout, stderr=stderr)
513 return CompletedProcess(process.args, retcode, stdout, stderr)
CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-12-f018e5c84d08> in <module>
----> 1 pipeline_result = DataflowRunner().run_pipeline(p, options=options)
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options)
491 environments.DockerEnvironment.from_container_image(
492 apiclient.get_container_image_from_options(options),
--> 493 artifacts=environments.python_sdk_dependencies(options)))
494
495 # This has to be performed before pipeline proto is constructed to make sure
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/environments.py in python_sdk_dependencies(options, tmp_dir)
624 options,
625 tmp_dir,
--> 626 skip_prestaged_dependencies=skip_prestaged_dependencies))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in create_job_resources(options, temp_dir, build_setup_args, populate_requirements_cache, skip_prestaged_dependencies)
178 populate_requirements_cache if populate_requirements_cache else
179 Stager._populate_requirements_cache)(
--> 180 setup_options.requirements_file, requirements_cache_path)
181 for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
182 resources.append((pkg, os.path.basename(pkg)))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
234 while True:
235 try:
--> 236 return fun(*args, **kwargs)
237 except Exception as exn: # pylint: disable=broad-except
238 if not retry_filter(exn):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir)
569 ]
570 _LOGGER.info('Executing command: %s', cmd_args)
--> 571 processes.check_output(cmd_args, stderr=processes.STDOUT)
572
573 @staticmethod
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
97 "Full traceback: {} \n Pip install failed for package: {} \
98 \n Output from execution of subprocess: {}" \
---> 99 .format(traceback.format_exc(), args[0][6], error.output))
100 else:
101 raise RuntimeError("Full trace: {}, \
RuntimeError: Full traceback: Traceback (most recent call last):
File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py", line 91, in check_output
out = subprocess.check_output(*args, **kwargs)
File "/opt/conda/lib/python3.7/subprocess.py", line 411, in check_output
**kwargs).stdout
File "/opt/conda/lib/python3.7/subprocess.py", line 512, in run
output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
Pip install failed for package: -r
Output from execution of subprocess: b'Obtaining file:///root/apache-beam-custom/packages/beam/sdks/python (from -r requirements.txt (line 3))\n Saved /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zip\nCollecting absl-py==0.11.0\n Downloading absl-py-0.11.0.tar.gz (110 kB)\n Saved /tmp/dataflow-requirements-cache/absl-py-0.11.0.tar.gz\nCollecting argon2-cffi==20.1.0\n Downloading argon2-cffi-20.1.0.tar.gz (1.8 MB)\n Installing build dependencies: started\n Installing build dependencies: finished with status \'error\'\n ERROR: Command errored out with exit status 1:\n command: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\'\n cwd: None\n Complete output (85 lines):\n Collecting setuptools>=40.6.0\n Downloading setuptools-51.1.1.tar.gz (2.1 MB)\n Collecting wheel\n Downloading wheel-0.36.2.tar.gz (65 kB)\n Collecting cffi>=1.0\n Downloading cffi-1.14.4.tar.gz (471 kB)\n Collecting pycparser\n Downloading pycparser-2.20.tar.gz (161 kB)\n Skipping wheel build for setuptools, due to binaries being disabled for it.\n Skipping wheel build for wheel, due to binaries being disabled for it.\n Skipping wheel build for cffi, due to binaries being disabled for it.\n Skipping wheel build for pycparser, due to binaries being disabled for it.\n Installing collected packages: setuptools, wheel, pycparser, cffi\n Running setup.py install for setuptools: started\n Running setup.py install for setuptools: finished with status \'done\'\n Running setup.py install for wheel: started\n Running setup.py install for wheel: finished with status \'done\'\n Running setup.py install for pycparser: started\n Running setup.py install for pycparser: finished with status \'done\'\n Running setup.py install for cffi: started\n Running setup.py install for cffi: finished with status \'error\'\n ERROR: Command errored out with exit status 1:\n command: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi\n cwd: /tmp/pip-install-6zs5jguv/cffi/\n Complete output (56 lines):\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n Package libffi was not found in the pkg-config search path.\n Perhaps you should add the directory containing `libffi.pc\'\n to the PKG_CONFIG_PATH environment variable\n No package \'libffi\' found\n running install\n running build\n running build_py\n creating build\n creating build/lib.linux-x86_64-3.7\n creating build/lib.linux-x86_64-3.7/cffi\n copying cffi/setuptools_ext.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/pkgconfig.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/verifier.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/vengine_gen.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/backend_ctypes.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/__init__.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/cffi_opcode.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/error.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/api.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/commontypes.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/ffiplatform.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/lock.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/cparser.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/recompiler.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/vengine_cpy.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/model.py -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/_cffi_include.h -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/parse_c_type.h -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/_embedding.h -> build/lib.linux-x86_64-3.7/cffi\n copying cffi/_cffi_errors.h -> build/lib.linux-x86_64-3.7/cffi\n running build_ext\n building \'_cffi_backend\' extension\n creating build/temp.linux-x86_64-3.7\n creating build/temp.linux-x86_64-3.7/c\n gcc -pthread -B /opt/conda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -DUSE__THREAD -DHAVE_SYNC_SYNCHRONIZE -I/usr/include/ffi -I/usr/include/libffi -I/root/apache-beam-custom/include -I/opt/conda/include/python3.7m -c c/_cffi_backend.c -o build/temp.linux-x86_64-3.7/c/_cffi_backend.o\n c/_cffi_backend.c:15:10: fatal error: ffi.h: No such file or directory\n #include <ffi.h>\n ^~~~~~~\n compilation terminated.\n error: command \'gcc\' failed with exit status 1\n ----------------------------------------\n ERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi Check the logs for full command output.\n WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\n You should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n ----------------------------------------\nERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\' Check the logs for full command output.\nWARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\nYou should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n'
Did I do something wrong?
-------------- EDIT---------------------------------------
Ok, I've got my pipeline to work, but I'm still having a problem with my requirements.txt file which I believe I'm passing in correctly.
My pipeline code:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from google.cloud.bigtable.row import DirectRow
import datetime
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = ''
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location
ib.options.recording_duration = '1m'
# The Google Cloud PubSub topic for this example.
topic = ""
subscription = ""
output_topic = ""
# Info
project_id = ""
bigtable_instance = ""
bigtable_table_id = ""
class CreateRowFn(beam.DoFn):
def process(self,words):
from google.cloud.bigtable.row import DirectRow
import datetime
direct_row = DirectRow(row_key="phone#4c410523#20190501")
direct_row.set_cell(
"stats_summary",
b"os_build",
b"android",
datetime.datetime.now())
return [direct_row]
p = beam.Pipeline(InteractiveRunner(),options=options)
words = p | "read" >> beam.io.ReadFromPubSub(subscription=subscription)
windowed_words = (words | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
# Writing to BigTable
test = words | beam.ParDo(CreateRowFn()) | WriteToBigTable(
project_id=project_id,
instance_id=bigtable_instance,
table_id=bigtable_table_id)
pipeline_result = DataflowRunner().run_pipeline(p, options=options)
As you can see in "CreateRowFn", I need to import
from google.cloud.bigtable.row import DirectRow
import datetime
Only then this works.
I've passed in requirements.txt as options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt" and I see it on Dataflow console.
If I remove the import statements, I get "in process NameError: name 'DirectRow' is not defined".
Is there anyway to overcome this?
I've found the answer in the FAQs. My mistake was not about how to pass in requirements.txt but how to handle NameErrors
https://cloud.google.com/dataflow/docs/resources/faq
How do I handle NameErrors? If you're getting a NameError when you execute your pipeline using the Dataflow service but not when you execute locally (i.e. using the DirectRunner), your DoFns may be using values in the global namespace that are not available on the Dataflow worker.
By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Dataflow worker.
Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.
For example, instead of:
import re
…
def myfunc():
# use re module
use:
def myfunc():
import re
# use re module
Alternatively, if your DoFns span multiple files, you should use a different approach to packaging your workflow and managing dependencies.
So the conclusion is: It is ok to use import statements within the functions