sqlgoogle-bigquerydataform

Incremental update of BigQuery table using Dataform with multiple conditions


I have created a BigQuery table using the following code in Dataform:

config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"

${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}

That work just fine. But, I want to schedule daily updates doing the following thing:

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"

${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}

but this doesn't work as I get the following error:

enter image description here

So, from the error message I realize that what I am doing actually creates 2 where statements. I therefore tried to have the filter inside the incremental clause with an AND but this doesn't work either. I have found an addition to the incremental update of tables called updatePartitionFilter but I am not sure whether it can be used for this.

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:
        "storeName like '%XXXX%'",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}

Is this the right way to do this? Since I am updating by day, I have no way of testing this before tomorrow. I don't have time to wait.


Solution

  • To build on Serge's answer, the ${when(incremental(), 'WHERE ..,')} block simply appends the specified WHERE clause to the executed SQL. A helpful video explaining how the executed SQL is compiled can be found here.

    In your initial example, you already had a WHERE clause specified, so this was adding a second WHERE clause, resulting in the error in the executed query where it was unexpectedly finding the keyword WHERE.

    The solution that Serge has proposed works, as this will add the WHERE clause and then the extra criteria with AND afterwards, which (when incrementing the table) will compile the below:

    INSERT INTO `<target_table>`
    SELECT DATE(timestamp) AS Dates, *
    FROM `<mytable in BQ>`
    WHERE timestamp > (select max(timestamp) from `<target_table>`
    AND storeName like "%XXXX%"
    

    However I believe this would fail to include the filter on storeName for the initial build of the <target_table>, as when the table doesn't yet exist, the ${when(incremental(),'WHERE ...')} clause is not appended. This would leave the compiled SQL as below:

    CREATE OR REPLACE TABLE `<target_table>` AS
    SELECT DATE(timestamp) AS Dates, *
    FROM `<mytable in BQ>`
    -- NOTE NO `WHERE` CLAUSE
    

    A better way could be to change the WHERE to an AND in the ${when(incremental(),...)} logic, as below:

    config {
      type: "incremental",
      bigquery: {
        partitionBy: "DATE(timestamp)",
        clusterBy: ["itemName",'StoreName']
      }
    }
    
    SELECT DATE(timestamp) as Dates, *
    from `<mytable in BQ>`
    where storeName like "%XXXX%"
    
    ${when(incremental(), `AND timestamp > (select max(timestamp) from ${self()})`)}
    

    This would execute the query successfully with the filter on storeName when creating the table initially, and then for any subsequent runs would execute the below compiled SQL to increment:

    INSERT INTO `<target_table>`
    SELECT DATE(timestamp) AS Dates, *
    FROM `<mytable in BQ>`
    WHERE storeName like "%XXXX%"
    AND timestamp > (select max(timestamp) from `<target_table>`
    

    Hopefully that makes sense! (This is my first ever response on SO so apologies for any dodgy formatting...)

    Edit: Amended the second code block example as I hadn't realised that Serge had included the filter on storeName within the ${when(incremental(),...)} config.