When we use a BigQueryIO transform to insert rows, we have an option called:
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
which instructs the pipeline to NOT attempt to create the table if the table doesn't already exist. In my scenario, I want to trap all errors. I attempted to use the following:
var write=mypipline.apply("Write table", BigQueryIO
.<Employee>write()
.to(targetTableName_notpresent)
.withExtendedErrorInfo()
.withFormatFunction(new EmployeeToTableRow())
.withSchema(schema)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withTableDescription("My Test Table")
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
which tried to insert rows into a non-existent table. What I found was a RuntimeException. Where I am stuck is that I don't know how to handle RuntimeException problems. I don't believe there is anything here I can surround with a try/catch.
This question is similar to this one:
but I don't think that got a working answer and was focused on a missing Dataset and not a table.
My exception from the fragment above is:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/XXXX/datasets/jupyter/tables/not_here/insertAll?prettyPrint=false
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table XXXX:jupyter.not_here",
"reason" : "notFound"
} ],
"message" : "Not found: Table XXXX:jupyter.not_here",
"status" : "NOT_FOUND"
}
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at .(#126:1)
You can't add a try/catch
directly from the BigQueryIO
in the Beam
job, if the destination table doesn't exist.
I think it's better to delegate this responsability outside of Beam
or launch the job only if your table exists.
Usually a tool like Terraform
has the responsability to create infrastructure, before to deploy resources and run Beam
jobs.
If it's mandatory for you to check the existence of the table, you can create :
Shell
script with bq
and gcloud
cli to check the existence before to launch the jobPython
script to check the existence before to launch the jobPython script :
For Python
there is the BigQuery
Python
client :
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to determine existence.
# table_id = "your-project.your_dataset.your_table"
try:
client.get_table(table_id) # Make an API request.
print("Table {} already exists.".format(table_id))
except NotFound:
print("Table {} is not found.".format(table_id))
BQ Shell script :
bq show <project_id>:<dataset_id>.<table_id>
If the table doesn't exist, catch the error and do not start the Dataflow
job.