google-bigqueryapache-beamapache-beam-io

Handling RuntimeException errors in a BigQuery pipeline


When we use a BigQueryIO transform to insert rows, we have an option called:

.withCreateDisposition(CreateDisposition.CREATE_NEVER)

which instructs the pipeline to NOT attempt to create the table if the table doesn't already exist. In my scenario, I want to trap all errors. I attempted to use the following:

var write=mypipline.apply("Write table", BigQueryIO
    .<Employee>write()
    .to(targetTableName_notpresent)
    .withExtendedErrorInfo()
    .withFormatFunction(new EmployeeToTableRow())
    .withSchema(schema)
    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
    .withTableDescription("My Test Table")
    .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(WriteDisposition.WRITE_APPEND));

which tried to insert rows into a non-existent table. What I found was a RuntimeException. Where I am stuck is that I don't know how to handle RuntimeException problems. I don't believe there is anything here I can surround with a try/catch.

This question is similar to this one:

Is it possible to catch a missing dataset java.lang.RuntimeException in a Google Cloud Dataflow pipeline that writes from Pub/Sub to BigQuery?

but I don't think that got a working answer and was focused on a missing Dataset and not a table.

My exception from the fragment above is:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://bigquery.googleapis.com/bigquery/v2/projects/XXXX/datasets/jupyter/tables/not_here/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Table XXXX:jupyter.not_here",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Table XXXX:jupyter.not_here",
  "status" : "NOT_FOUND"
}
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at .(#126:1)

Solution

  • You can't add a try/catch directly from the BigQueryIO in the Beam job, if the destination table doesn't exist.

    I think it's better to delegate this responsability outside of Beam or launch the job only if your table exists.

    Usually a tool like Terraform has the responsability to create infrastructure, before to deploy resources and run Beam jobs.

    If it's mandatory for you to check the existence of the table, you can create :

    Python script :

    For Python there is the BigQuery Python client :

    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    
    client = bigquery.Client()
    
    # TODO(developer): Set table_id to the ID of the table to determine existence.
    # table_id = "your-project.your_dataset.your_table"
    
    try:
        client.get_table(table_id)  # Make an API request.
        print("Table {} already exists.".format(table_id))
    except NotFound:
        print("Table {} is not found.".format(table_id))
    

    BQ Shell script :

    bq show <project_id>:<dataset_id>.<table_id>
    

    If the table doesn't exist, catch the error and do not start the Dataflow job.