My Dataflow pipeline is as followed
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True, sdk_location="container"
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| f"Read event topic"
>> io.ReadFromPubSub(topic=input_topic).with_output_types(bytes)
| "Convert to string" >> beam.Map(lambda msg: msg.decode("utf=8"))
| f"Transform event"
>> beam.Map(transform_message, event_name=event_name)
| f"Write to output topic"
>> beam.Map(publish_to_output_topic)
)
And I'm using Flex templates to deploy my pipeline. And I built it using the gcloud CLI like so
gcloud dataflow flex-template build gs://mybucket/templates/dataflow-latest.json \
--image "us-docker.pkg.dev/project_id/dataflow/dataflow:latest" \
--sdk-language "PYTHON" "
and I invoke the job as so
gcloud dataflow flex-template run "test-job" \
--template-file-gcs-location "gs://mybucket/templates/dataflow-latest.json" \
--service-account-email "dataflow@project_id.iam.gserviceaccount.com" \
--staging-location "gs://mybucket/staging/" \
--temp-location "gs://mybucket/temp/" \
--parameters event_name="foobuzz" \
--parameters sdk_container_image="us-docker.pkg.dev/project_id/dataflow/dataflowsdk:latest" \
--region "us-central2"
However my template is unable to fire up and I keep getting this error
{"severity":"INFO","time":"2023/07/11 17:59:12.176036","line":"python_template.go:64","message":"Using launch args: [/dataflow/template/beam.py --runner=DataflowRunner --region=us-central2 --staging_location=gs://mybucket/staging/ --event_name=foobuzz --project=project_id --job_name=test-job --template_location=gs://my-bucket/staging/template_launches/2023-07-11_10_55_55-17803180896608448008/job_object --service_account_email=dataflow@my_project.iam.gserviceaccount.com --temp_location=gs://mybucket/temp/ --sdk_container_image=us-docker.pkg.dev/project_id/dataflow:latest]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.782216","line":"exec.go:66","message":"┬ T4: \u003cclass 'apache_beam.transforms.core.CallableWrapperDoFn'\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.782528","line":"exec.go:66","message":"└ # T4 [54 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.782851","line":"exec.go:66","message":"┬ D2: \u003cdict object at 0x7fcaee860840\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.783157","line":"exec.go:66","message":"├┬ F1: \u003cfunction Map.\u003clocals\u003e.\u003clambda\u003e at 0x7fcaee86e430\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.783359","line":"exec.go:66","message":"│├┬ F2: \u003cfunction _create_function at 0x7fcb0746e1f0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.783562","line":"exec.go:66","message":"││└ # F2 [34 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784154","line":"exec.go:66","message":"│├┬ Co: \u003ccode object \u003clambda\u003e at 0x7fcaf7968df0, file \"/usr/local/lib/python3.8/site-packages/apache_beam/transforms/core.py\", line 1900\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784366","line":"exec.go:66","message":"││├┬ F2: \u003cfunction _create_code at 0x7fcb0746e280\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784554","line":"exec.go:66","message":"│││└ # F2 [19 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784825","line":"exec.go:66","message":"││└ # Co [156 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785078","line":"exec.go:66","message":"│├┬ D4: \u003cdict object at 0x7fcaf7949300\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785230","line":"exec.go:66","message":"││└ # D4 [38 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785515","line":"exec.go:66","message":"│├┬ Ce2: \u003ccell at 0x7fcaee87a520: function object at 0x7fcaee86e3a0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785693","line":"exec.go:66","message":"││├┬ F2: \u003cfunction _create_cell at 0x7fcb0746ea60\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785883","line":"exec.go:66","message":"│││└ # F2 [19 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786047","line":"exec.go:66","message":"││└ # Ce2 [24 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786211","line":"exec.go:66","message":"│├┬ D2: \u003cdict object at 0x7fcaee860d00\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786379","line":"exec.go:66","message":"││├┬ F1: \u003cfunction run.\u003clocals\u003e.\u003clambda\u003e at 0x7fcaee86e3a0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786567","line":"exec.go:66","message":"│││├┬ Co: \u003ccode object \u003clambda\u003e at 0x7fcb07d62240, file \"/dataflow/template/beam.py\", line 220\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786850","line":"exec.go:66","message":"││││└ # Co [99 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.789579","line":"exec.go:66","message":"│││├┬ D1: \u003cdict object at 0x7fcb07e86fc0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.789761","line":"exec.go:66","message":"││││└ # D1 [22 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.789954","line":"exec.go:66","message":"│││├┬ D2: \u003cdict object at 0x7fcaee85b0c0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790158","line":"exec.go:66","message":"││││├┬ T6: \u003cclass 'apache_beam.typehints.decorators.IOTypeHints'\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790344","line":"exec.go:66","message":"│││││├┬ F2: \u003cfunction _create_namedtuple at 0x7fcb074700d0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790524","line":"exec.go:66","message":"││││││└ # F2 [25 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790718","line":"exec.go:66","message":"│││││└ # T6 [118 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790898","line":"exec.go:66","message":"││││└ # D2 [143 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791050","line":"exec.go:66","message":"│││├┬ D2: \u003cdict object at 0x7fcaee866600\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791219","line":"exec.go:66","message":"││││├┬ D2: \u003cdict object at 0x7fcaef60f080\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791370","line":"exec.go:66","message":"│││││└ # D2 [2 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791529","line":"exec.go:66","message":"││││└ # D2 [63 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791681","line":"exec.go:66","message":"│││└ # F1 [341 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791824","line":"exec.go:66","message":"││└ # D2 [358 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791978","line":"exec.go:66","message":"│├┬ D2: \u003cdict object at 0x7fcaee86f200\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.792141","line":"exec.go:66","message":"││├┬ D2: \u003cdict object at 0x7fcaee800ac0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.792287","line":"exec.go:66","message":"│││└ # D2 [2 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.792453","line":"exec.go:66","message":"││└ # D2 [34 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805605","line":"exec.go:66","message":"Traceback (most recent call last):"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805641","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py\", line 246, in dumps"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805661","line":"exec.go:66","message":" s = dill.dumps(o, byref=settings['dill_byref'])"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805675","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 263, in dumps"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805691","line":"exec.go:66","message":" dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805707","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 235, in dump"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805722","line":"exec.go:66","message":" Pickler(file, protocol, **_kwds).dump(obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805735","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 394, in dump"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805749","line":"exec.go:66","message":" StockPickler.dump(self, obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805766","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 487, in dump"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805780","line":"exec.go:66","message":" self.save(obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805792","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805813","line":"exec.go:66","message":" StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805828","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 603, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805842","line":"exec.go:66","message":" self.save_reduce(obj=obj, *rv)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805854","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 717, in save_reduce"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805870","line":"exec.go:66","message":" save(state)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805883","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805897","line":"exec.go:66","message":" StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805910","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805924","line":"exec.go:66","message":" f(self, obj) # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805937","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py\", line 216, in new_save_module_dict"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805953","line":"exec.go:66","message":" return old_save_module_dict(pickler, obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805965","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 1186, in save_module_dict"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805980","line":"exec.go:66","message":" StockPickler.save_dict(pickler, obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805993","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 971, in save_dict"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806007","line":"exec.go:66","message":" self._batch_setitems(obj.items())"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806019","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 997, in _batch_setitems"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806033","line":"exec.go:66","message":" save(v)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806045","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806059","line":"exec.go:66","message":" StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806072","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806087","line":"exec.go:66","message":" f(self, obj) # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806100","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 1824, in save_function"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806114","line":"exec.go:66","message":" _save_with_postproc(pickler, (_create_function, ("}
{"severity":"INFO","time":"2023/07/11 17:59:27.806127","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 1089, in _save_with_postproc"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806141","line":"exec.go:66","message":" pickler.save_reduce(*reduction)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806153","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 691, in save_reduce"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806167","line":"exec.go:66","message":" save(func)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806178","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806196","line":"exec.go:66","message":" StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806211","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 603, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806224","line":"exec.go:66","message":" self.save_reduce(obj=obj, *rv)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806236","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 692, in save_reduce"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806251","line":"exec.go:66","message":" save(args)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806263","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806278","line":"exec.go:66","message":" StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806291","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806304","line":"exec.go:66","message":" f(self, obj) # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806325","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 886, in save_tuple"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806351","line":"exec.go:66","message":" save(element)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806363","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806378","line":"exec.go:66","message":" StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806391","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806405","line":"exec.go:66","message":" f(self, obj) # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806418","line":"exec.go:66","message":" File \"/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py\", line 170, in save_module"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806464","line":"exec.go:66","message":" dill.dill.log.info('M2: %s' % obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806477","line":"exec.go:66","message":"AttributeError: module 'dill._dill' has no attribute 'log'"}
I'm unsure where to look. It doesn't look like dill or beam-sdk haven't had a new release recently. I tried this out a few days ago and it worked so I'm unsure exactly where to look here. It looks like there is an issue with serializing the Dataflow pipeline.
Turns out that something within my pipeline could not have been serialized via dill
(unsure what part exactly). After going down the rabbit hole and reading this, I was able to get my pipeline running changing the serialization library from dill
to cloudpickle
pipeline_options = PipelineOptions(
pipeline_args,
streaming=True,
save_main_session=True,
sdk_location="container",
pickle_library="cloudpickle",
)
It should be noted that even though cloudpickle
is more expressive than dill
, it is still considered experimental (the issue is here). It is my hope that cloudpickle
will be the default serialization library, but if it is not, then dill
versioning should be kept inline with the Beam SDKs here. Thanks to XQ Hu for pointing this out.