google-bigquery

how to update rows in bigquery from a list containing array of structs using parameterized sql query in python


I have a cloud function in google cloud that uses streaming inserts to load data about images into bigquery. Each image is broken up into different sections so I have a row in a table for each section of the image. I want to update these rows later on with some added information using a single sql query, but my table contains arrays of structs. I am currently generating the updated information in python and saving each row in a dictionary and creating a list that contains all the dictionaries I want to update. Now I want to use this list of row updates to update the related rows in my table using a parameterized sql query. The data looks like this:

row_updates = [{'userID': 'user1', 'timestamp': 1678755601, 'x': 89.01, 'y': 101.01, 'z': 6, 'section': 1, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}, {'userID': 'user1', 'timestamp': 1678755601, 'x': 109.01, 'y': 102.01, 'z': 6, 'section': 2, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}]

My table is partitioned on timestamp, clustered on userID and contains a few fields and an array of structs with the following schema: Fieldname Type Mode Level 1 userID STRING REQUIRED
Level 1 timestamp TIMESTAMP REQUIRED
Level 1 x FLOAT NULLABLE
Level 1 y FLOAT NULLABLE
Level 1 z FLOAT NULLABLE
Level 1 stack_num INTEGER NULLABLE
Level 1 eyeDetection RECORD REPEATED
Level 2 eyeDetected STRING NULLABLE
Level 2 dateComputed INTEGER NULLABLE

I dont fully understand how to properly set up the query_parameters to send my specific data or process it using sql. I have read the documentation here https://cloud.google.com/bigquery/docs/parameterized-queries and looked through the relevant client library classes docs https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.query.ArrayQueryParameter but I cant get it to work with my data. My goal was to use the ArrayQueryParameter to load my list and then unnest and match it all in the sql query, but I am not sure how to do that or if it is possible. Here is my code:

from google.cloud import bigquery


client = bigquery.Client()

row_updates = [{'userID': 'user1', 'timestamp': 1678755601, 'x': 89.01, 'y': 101.01, 'z': 6, 'section': 1, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}, {'userID': 'user1', 'timestamp': 1678755601, 'x': 109.01, 'y': 102.01, 'z': 6, 'section': 2, 'eyeDetection': [{'eyeDetected': 'true', 'dateComputed': 1678755100}]}]

update_query = f"""
    UPDATE `{dataset_name}.{table_name}` main
    SET
        main.x = TEMP.x,
        main.y = TEMP.y,
        main.z = TEMP.z,
        main.eyeDetection = TEMP.eyeDetection
    FROM UNNEST(@row_updates) as TEMP
    WHERE main.userID = TEMP.userID AND
    main.timestamp = TEMP.timestamp AND
    main.stack_num = TEMP.stack_num;
"""

query_params = [
    bigquery.ArrayQueryParameter(
        "row_updates",
        "STRUCT<userID STRING, timestamp TIMESTAMP, x FLOAT64, y FLOAT64, z FLOAT64, stack_num INT64, eyeDetection ARRAY<STRUCT<eyeDetected STRING, dateComputed INT64>>>",
        row_updates
    )
]

# Execute the parameterized update query
job_config = bigquery.QueryJobConfig(query_parameters=query_params)
client.query(update_query, job_config=job_config).result()

when I run this I get

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\client.py", line 3334, in query
    return _job_helpers.query_jobs_insert(
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\_job_helpers.py", line 114, in query_jobs_insert
    future = do_query()
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\_job_helpers.py", line 91, in do_query
    query_job._begin(retry=retry, timeout=timeout)
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\job\query.py", line 1298, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\job\base.py", line 510, in _begin
    api_response = client._call_api(
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\bigquery\client.py", line 759, in _call_api
    return call()
    return retry_target(
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\api_core\retry.py", line 190, in retry_target
    return target()
  File "C:\Users\dsuit\Documents\files\.virtualenvs\repo-env2\lib\site-packages\google\cloud\_http\__init__.py", line 494, in api_request
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/my-project-test/jobs?prettyPrint=false: Invalid value for type: STRUCT<userID STRING, timestamp TIMESTAMP, x FLOAT64, y FLOAT64, z FLOAT64, stack_num INT64, eyeDetection ARRAY<STRUCT<eyeDetected STRING, dateComputed INT64>>> is not a valid value

If I modify the query_params to use Struct and ScalarQueryParameters it requires me to include the value for a single row only. Open to other solutions as long as they allow for all the updates from the list to happen in a single query. I dont want to load into temp tables and merge from there if it can be avoided.


Solution

  • I recently decided to try revisiting this problem now that I have some more experience with bigquery and figured out a solution. I restructured my table to use a single NULLABLE RECORD type with multiple fields instead of an array of STRUCTS via the REPEATABLE RECORD type. This way I can just update the entire eyeDetected field with the data from my list. I then use a dynamically generated query string based on the data in my list and feed it to a MERGE DML command to update all the rows at once. This is what it looks like for the example above:

    MERGE INTO {table_name} AS target
    USING (SELECT * FROM UNNEST([{row_updates}]) ) AS source
    ON target.userID = source.userID AND target.timestamp = 
    source.timestamp AND target.stack_num = source.stack_num
    WHEN MATCHED THEN
    UPDATE SET eyeDetection = source.eyeDetection;
    

    The big gotcha with this method is that you cannot do updates using DML statements on rows that are currently in the streaming buffer. Any updates have to wait upwards of 90 minutes before being attempted unless you want to use staging/temp tables.