pythonauthenticationgoogle-bigqueryapache-beam

How to write to a BigQuery table using ApacheBeam ran locally without running the Google Cloud CLI executable?


I can use the external Python engine to read data from a BigQuery table, but I cannot authenticate when I use Apache Beam + Python run locally.

Documentation only gives the CLI option "When you run locally, your Apache Beam pipeline runs as the Google Cloud account that you configured with the Google Cloud CLI executable. Hence, locally run Apache Beam SDK operations and your Google Cloud account have access to the same files and resources."

I cannot save the contents of the Service Account as a file, it will be unsafe in my set-up.

I wanted to authenticate using an environment variable which value is the value of the SA API key, like I have done with bigquery.Client.

I have added method="STREAMING_INSERTS" to avoid using the bucket.

The code I have got so far:

import os
import pandas
import pandas_gbq

from google.cloud import bigquery
from google.oauth2 import service_account

import apache_beam as beam
from apache_beam.options.pipeline_options import  PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery

assert 'google_key' in os.environ, "Please define a secret called google_key; its value must be the contents of access key in JSON format"

credentials = service_account.Credentials.from_service_account_info(eval("{" + os.environ['google_key'] +"}"))
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# reading data - client.query() - works fine
query_job = client.query("SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 1")
df = query_job.to_dataframe()
# writing data using load_table_from_dataframe also works fine
# https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe
quotes_list = [{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
               {'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
              ]
df_quotes = pandas.DataFrame(quotes_list)
job_config = bigquery.LoadJobConfig(
    schema=[bigquery.SchemaField(col, bigquery.enums.SqlTypeNames.STRING) for col in df_quotes.columns],
    write_disposition="WRITE_TRUNCATE",
)
table_id = "dummy_dataset.quotes_table"
job = client.load_table_from_dataframe(df_quotes, table_id, job_config=job_config)  # Make an API request.
job.result()  # Wait for the job to complete.
table = client.get_table(table_id)  # Make an API request.
print(f"Loaded {table.num_rows} rows and {len(table.schema)} columns")

# writing data using pandas_gbq works fine
# https://pandas-gbq.readthedocs.io/en/latest/writing.html#writing-to-an-existing-table
pandas_gbq.to_gbq(df, "dummy_dataset.dummy_table2", credentials.project_id, credentials=credentials, if_exists='replace')

# writing using WriteToBigQuery does not work
beam_options = PipelineOptions()

with beam.Pipeline(options=beam_options) as pipeline:
  quotes = pipeline | beam.Create(quotes_list)
  quotes | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=f'{credentials.project_id}:dummy_dataset.dummy_table',
    ignore_unknown_columns=True,
    schema='source:STRING, quote:STRING',
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    method="STREAMING_INSERTS")

The errors and warnings that I get when I use beam_options = PipelineOptions() are as follows:

WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: Remote end closed connection without response
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: [Errno -2] Name or service not known
WARNING:google.auth._default:No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
ERROR:apache_beam.runners.common:Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
    self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
    self.gcp_bq_client = client or gcp_bigquery.Client(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
    super(Client, self).__init__(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
    _ClientProjectMixin.__init__(self, project=project, credentials=credentials)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
    raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
    self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
    self.gcp_bq_client = client or gcp_bigquery.Client(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
    super(Client, self).__init__(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
    _ClientProjectMixin.__init__(self, project=project, credentials=credentials)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
    raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/runner/BigQueryWrite/main.py", line 27, in <module>
    with beam.Pipeline(options=beam_options) as pipeline:
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 612, in __exit__
    self.result = self.run()
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/pipeline.py", line 586, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 466, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 794, in _execute_bundle
    self._run_bundle(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1031, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1367, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 384, in push
    response = self.worker.do_instruction(request)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
    return getattr(self, request_type)(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1071, in process_bundle
    op.start()
  File "apache_beam/runners/worker/operations.py", line 929, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 934, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/common.py", line 1510, in apache_beam.runners.common.DoFnRunner.start
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py", line 1494, in start_bundle
    self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 356, in __init__
    self.gcp_bq_client = client or gcp_bigquery.Client(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 238, in __init__
    super(Client, self).__init__(
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 320, in __init__
    _ClientProjectMixin.__init__(self, project=project, credentials=credentials)
  File "/home/runner/BigQueryWrite/.pythonlibs/lib/python3.10/site-packages/google/cloud/client/__init__.py", line 271, in __init__
    raise EnvironmentError(
OSError: Project was not passed and could not be determined from the environment. [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']

Solution

  • Solved by creating a temporary file, writing the contents of the Service Account API Key into this file, and creating a temporary environment variable GOOGLE_APPLICATION_CREDENTIALS which contains the path to the temporary file.

    import os
    import tempfile
    
    tf = tempfile.NamedTemporaryFile()
    with open(tf.name, 'w') as f:
      f.write("{" + os.environ['google_sa_key'] +"}")
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = tf.name