I can use the external Python engine to read data from a BigQuery table, but I cannot authenticate when I use Apache Beam + Python run locally.
Documentation only gives the CLI option "When you run locally, your Apache Beam pipeline runs as the Google Cloud account that you configured with the Google Cloud CLI executable. Hence, locally run Apache Beam SDK operations and your Google Cloud account have access to the same files and resources."
I cannot save the contents of the Service Account as a file, it will be unsafe in my set-up.
I wanted to authenticate using an environment variable which value is the value of the SA API key, like I have done with bigquery.Client
.
I have added method="STREAMING_INSERTS"
to avoid using the bucket.
The code I have got so far:
import os
import pandas
import pandas_gbq
from google.cloud import bigquery
from google.oauth2 import service_account
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
assert 'google_key' in os.environ, "Please define a secret called google_key; its value must be the contents of access key in JSON format"
credentials = service_account.Credentials.from_service_account_info(eval("{" + os.environ['google_key'] +"}"))
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
# reading data - client.query() - works fine
query_job = client.query("SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 1")
df = query_job.to_dataframe()
# writing data using load_table_from_dataframe also works fine
# https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe
quotes_list = [{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
]
df_quotes = pandas.DataFrame(quotes_list)
job_config = bigquery.LoadJobConfig(
schema=[bigquery.SchemaField(col, bigquery.enums.SqlTypeNames.STRING) for col in df_quotes.columns],
write_disposition="WRITE_TRUNCATE",
)
table_id = "dummy_dataset.quotes_table"
job = client.load_table_from_dataframe(df_quotes, table_id, job_config=job_config) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(f"Loaded {table.num_rows} rows and {len(table.schema)} columns")
# writing data using pandas_gbq works fine
# https://pandas-gbq.readthedocs.io/en/latest/writing.html#writing-to-an-existing-table
pandas_gbq.to_gbq(df, "dummy_dataset.dummy_table2", credentials.project_id, credentials=credentials, if_exists='replace')
# writing using WriteToBigQuery does not work
beam_options = PipelineOptions()
with beam.Pipeline(options=beam_options) as pipeline:
quotes = pipeline | beam.Create(quotes_list)
quotes | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=f'{credentials.project_id}:dummy_dataset.dummy_table',
ignore_unknown_columns=True,
schema='source:STRING, quote:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method="STREAMING_INSERTS")
The errors and warnings that I get when I use beam_options = PipelineOptions()
are as follows:
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
ERROR:apache_beam.runners.common:Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
self.gcp_bq_client = client or gcp_bigquery.Client(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
super(Client, self).__init__(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
_ClientProjectMixin.__init__(self, project=project, credentials=credentials)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
self.gcp_bq_client = client or gcp_bigquery.Client(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
super(Client, self).__init__(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
_ClientProjectMixin.__init__(self, project=project, credentials=credentials)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/runner/BigQueryWrite/main.py", line 27, in <module>
with beam.Pipeline(options=beam_options) as pipeline:
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 612, in __exit__
self.result = self.run()
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 586, in run
return self.runner.run_pipeline(self, self._options)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 466, in run_stages
bundle_results = self._execute_bundle(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 794, in _execute_bundle
self._run_bundle(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1031, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1367, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 384, in push
response = self.worker.do_instruction(request)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
return getattr(self, request_type)(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1071, in process_bundle
op.start()
File "apache_beam/runners/worker/operations.py", line 929, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 934, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/common.py", line 1510, in apache_beam.runners.common.DoFnRunner.start
File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
self.gcp_bq_client = client or gcp_bigquery.Client(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
super(Client, self).__init__(
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
_ClientProjectMixin.__init__(self, project=project, credentials=credentials)
File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
Solved by creating a temporary file, writing the contents of the Service Account API Key into this file, and creating a temporary environment variable GOOGLE_APPLICATION_CREDENTIALS
which contains the path to the temporary file.
import os
import tempfile
tf = tempfile.NamedTemporaryFile()
with open(tf.name, 'w') as f:
f.write("{" + os.environ['google_sa_key'] +"}")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = tf.name