sqlgoogle-bigquerydbtincremental-load

Is it a good practice to use update_timestamp for incremental loading in DBT with insert_overwrite strategy?


Suppose I have a dbt/bigquery model reading from a simple upstream model partitionned by day using "partition_date" and clustered by "update_timestamp" and that would look like the following (for the example i have one line per partition_date, but there should be much more lines per partition_date and with the same update_timestamp inside a same partition) :

Upstream table

SK column1 update_timestamp partition_date
1 value20200101 2020-01-01 20:00:00 UTC 2020-01-01
2 value20200102 2020-01-02 20:00:00 UTC 2020-01-02
...
w value20220610 2022-06-10 20:00:00 UTC 2022-06-10
x value20220611 2022-06-11 20:00:00 UTC 2022-06-11
...
y value20240703 2024-07-03 20:00:00 UTC 2024-07-03
z value20240704 2024-07-04 20:00:00 UTC 2024-07-04
...

And suppose when the upstream table does a full refresh to update "column1", the "update_timestamp" column of all records will be updated with the timestamp of the full refresh, while "partition_date" won't be affected.

Suppose my dbt/bigquery model is incremental, uses the insert_overwrite strategy, is partitionned by "partition_date" with a monthly granularity, and looks like (note that even if partition_date is still a date in my table, all the dates of the same month are in the same partition since i use monthly granularity):

My table

SK column1 update_timestamp partition_date
1 value20200101 2020-01-01 20:00:00 UTC 2020-01-01
2 value20200102 2020-01-02 20:00:00 UTC 2020-01-02
...
w value20220610 2022-06-10 20:00:00 UTC 2022-06-10
x value20220611 2022-06-11 20:00:00 UTC 2022-06-11
...
y value20240703 2024-07-03 20:00:00 UTC 2024-07-03

I consider two strategies to construct my table incrementally, either based on "update_timestamp" or based on "partition_date" of the upstream table.

Using "update_timestamp", when my model runs at "2024-07-04 23:00:00 UTC" for example, I can get the latest value I retrieved for that column, i.e. "2024-07-03 20:00:00 UTC", transform it to get the first day of the month at midnight, i.e. "2024-07-01 00:00:00 UTC", and get all the values of the upstream table where "update_timestamp>= 2024-07-01 00:00:00 UTC" so that i can recreate my monthly partition "2024-07" using the insert_overwrite strategy.

With the "update_timestamp" strategy, in case the upstream table does a full refresh, on "2024-07-04 22:00:00 UTC" for example, to change values of "column1" (e.g. to do an uppercase of column1 values to have something like VALUE20200101 instead of value20200101), all the records of the upstream table will have "update_timestamp=2024-07-04 22:00:00 UTC", so when my model will run at "2024-07-04 23:00:00 UTC" i will be able to detect all the changes and get all the updated records, since all the records of the upstream table will verify the condition "update_timestamp>= 2024-07-01 00:00:00 UTC", and these new records will be organized inside new monthly partitions in my table that will override old partitions. The advantage is that I can detect all changes in the upstream table. The problem is that it means, for all my runs from "2024-07-04 23:00:00 UTC" until "2024-08-01 23:00:00 UTC" i will do an "automatic full refresh" of my table since for each run i will scan the entire upstream table, which might cost too much if the upstream table is big, and which might have other negative consequences.

Using "partition_date", when my model runs at "2024-07-04 23:00:00 UTC" for example, I can get the latest value I retrieved for that column, i.e. "2024-07-03", transform it to get the first day of the month, i.e. "2024-07-01", and get all the values of the upstream table where "partition_date >= 2024-07-01" so that i can recreate my monthly partition "2024-07" using the insert_overwrite strategy.

With the "partition_date" strategy, in case the upstream table does a full refresh, on "2024-07-04 22:00:00 UTC" for example, I won't be able to detect all the changes when my model will run at "2024-07-04 23:00:00 UTC" since the values of "partition_date" in the upstream table won't change, so I will still get only the records of the upstream table that verify the condition "partition_date >= 2024-07-01". The problem is that I do not detect all changes in the upstream table so I might expose corrupted data until I launch a manual full refresh of my model. The advantage is this won't cost too much since a single manual full refresh of my table will be enough.

Which strategy should be prefered ? Is there any enhancement that can be done on one of the strategies to mitigate its downsides ?


Solution

  • At the end, I went for a hybrid approach where i use "partition_date" of the upstream table to do the incremental loading, and where i use "update_timestamp" of the upstream table to detect when a full refresh is done on the upstream table so that i can do a full refresh on my side too.

    Note that this solution works because i know that my upstream table can not do a partial refresh of old records, meaning that my upstream table either appends news records or does a full refresh to update all records.

    But i guess there are probably other approaches using "partition_date" and "update_timestamp" that can handle partial refresh of old records.

    The DBT code of my model looks like :

    -- get max update_timestamp retrieved from the upstream table during the last run of my model
    {% set max_update_timestamp_query %} SELECT MAX(update_timestamp) FROM {{ this }} {% endset %}
    {% set max_update_timestamp = run_query(max_update_timestamp_query).columns[0].values()[0] %}
    
    -- get total count of records in the upstream table
    {% set count_total_query %} SELECT count(*) FROM {{ source(source_schema, source_table) }} {% endset %}
    {% set count_total = run_query(count_total_query).columns[0].values()[0] %}
    
    -- get count of records in the upstream table that were updated after the last run of my model
    {% set count_new_only_query %} SELECT count(*) FROM {{ source(source_schema, source_table) }} WHERE update_timestamp > '{{max_update_timestamp}}' {% endset %}
    {% set count_new_only = run_query(count_new_only_query).columns[0].values()[0] %}
    
    -- if all records of the upstream table were updated after the last run of my model then do a full refresh
    {% set should_full_refresh_my_model = count_new_only >= count_total %}
    
    {%- if is_incremental() and not should_full_refresh_my_model %}
        {% set my_last_partition_query %} SELECT DATE_TRUNC(MAX(partition_date), MONTH) FROM {{ this }} {% endset %}
        {% set my_last_partition = run_query(my_last_partition_query).columns[0].values()[0] %}
    {%- endif %}
    
    SELECT
        *
    FROM {{ source(source_schema, source_table) }}
    {%- if is_incremental() and not should_full_refresh_my_model %}
        WHERE partition_date >= '{{ my_last_partition }}'
    {%- endif %}