I can run my pipeline without problems locally with a Direct Runner, however when I deploy to Dataflow i get the following error:
"Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 219, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1311, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1322, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 919, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 200, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1401, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 239, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 393, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'my-dataset-id'
projectId: 'my-project-id'
tableId: 'my-table'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Write good logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
/---/
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'string1'
projectId: 'string2'
tableId: 'string3'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey' [while running 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/Map(reify_timestamps)-ptransform-9454']
This is the code of the pipeline:
logs = (p | beam.io.ReadFromPubSub(topic=topic)
| "Decode Json" >> beam.ParDo(ListProcessor())
| 'Add Timestamp' >> beam.Map(lambda log: beam.window.TimestampedValue(log, time.time()))
| 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
| 'Calculate Pipeline Duration' >> beam.ParDo(DurationProcessor())
)
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY', 'field': 'created_at'}}
errors = (logs | 'Write logs to BigQuery' >> beam.io.WriteToBigQuery(
'{0}:{1}.{2}'.format(project, collection, table_name),
schema=schema,
additional_bq_parameters=additional_bq_parameters,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
Could you help me with it? I already checked the source code without success. Thank you!
To prevent the error mentioned in this question, the fix is to use Apache Beam version 2.30.0 and higher as reported in https://issues.apache.org/jira/browse/BEAM-12079.