dbt

Overriding dbt snapshot table generation


Our client wants to create a PowerBI report that leans on an incremental refreshing table. After some experiments, they defined all the columns they need for the report. These are dbt_valid_from, dbt_valid_to and dbt_valid_to_or_from.

After some googleing, I found some information.

I implemented the following settings:

There is a snapshot sql in the dbt subfolder called dummy_table_hist.sql:

{% snapshot dummy_table_hist %}

    {{
        config(
            target_schema='dummy_schema',
            strategy='check',
            check_cols=['hash_diff'],
            unique_key='hash_id',
            invalidate_hard_deletes=True
        )
    }}

SELECT 
    *
FROM {{ source('bi_common', 'dummy_table') }}

{% endsnapshot %}

There is a helper.sql file which overrides the snapshot table generation logic.

{% macro default__build_snapshot_table(strategy, sql) %}
 
    select *, COALESCE(dbt_valid_to, dbt_valid_from) as dbt_valid_to_or_from
      from (
        select *,
            {{ strategy.scd_id }} as dbt_scd_id,
            {{ strategy.updated_at }} as dbt_updated_at,
            {{ strategy.updated_at }} as dbt_valid_from,
            nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
        from (
            {{ sql }}
        ) sbq
    )
 
{% endmacro %}

The initial data is as follows:

After creating the dummy_schema.dummy_table and running the dbt snapshot command to create dummy_schema.dummy_table_hist table, I changed the amount to 2 where id = 1. By running the dbt snapshot code, it appears in the dummy_schema.dummy_table_hist table, but there are two errors with that:

  1. When both dbt_valid_from and dbt_valid_to fields are filled, the dbt_valid_to_or_from column takes values from the dbt_valid_from instead of the dbt_valid_to.
  2. When there are new records, the new cells in the dbt_valid_to_or_from column are NULL. (For the first dbt snapshot run it works well.)

I use dbt-core==1.7.17 and dbt-redshift==1.7.7 versions because of other dependecies.

What should I do differently?


Solution

  • I found the solution to my own question. 🎉 I needed to override 3 functions in the following two files:

    So I extended the helpers.sql file, and added a new snapshot_merge.sql file as well.

    These contain the following codes:

    helpers.sql:

    {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
    
        with snapshot_query as (
    
            {{ source_sql }}
    
        ),
    
        snapshotted_data as (
    
            select *,
                {{ strategy.unique_key }} as dbt_unique_key
    
            from {{ target_relation }}
            where dbt_valid_to is null
    
        ),
    
        insertions_source_data as (
    
            select *, COALESCE(dbt_valid_to, dbt_valid_from) as dbt_valid_to_or_from
            from (
                select
                    *,
                    {{ strategy.unique_key }} as dbt_unique_key,
                    {{ strategy.updated_at }} as dbt_updated_at,
                    {{ strategy.updated_at }} as dbt_valid_from,
                    nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
                    {{ strategy.scd_id }} as dbt_scd_id
    
                from snapshot_query
            ) sbq
        ),
    
        updates_source_data as (
    
            select
                *,
                {{ strategy.unique_key }} as dbt_unique_key,
                {{ strategy.updated_at }} as dbt_updated_at,
                {{ strategy.updated_at }} as dbt_valid_from,
                {{ strategy.updated_at }} as dbt_valid_to
    
            from snapshot_query
        ),
    
        {%- if strategy.invalidate_hard_deletes %}
    
        deletes_source_data as (
    
            select
                *,
                {{ strategy.unique_key }} as dbt_unique_key
            from snapshot_query
        ),
        {% endif %}
    
        insertions as (
    
            select
                'insert' as dbt_change_type,
                source_data.*
    
            from insertions_source_data as source_data
            left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
            where snapshotted_data.dbt_unique_key is null
               or (
                    snapshotted_data.dbt_unique_key is not null
                and (
                    {{ strategy.row_changed }}
                )
            )
    
        ),
    
        updates as (
    
            select
                'update' as dbt_change_type,
                source_data.*,
                snapshotted_data.dbt_scd_id,
                COALESCE(source_data.dbt_valid_to, source_data.dbt_valid_from) as dbt_valid_to_or_from
    
            from updates_source_data as source_data
            join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
            where (
                {{ strategy.row_changed }}
            )
        )
    
        {%- if strategy.invalidate_hard_deletes -%}
        ,
    
        deletes as (
    
            select
                'delete' as dbt_change_type,
                source_data.*,
                {{ snapshot_get_time() }} as dbt_valid_from,
                {{ snapshot_get_time() }} as dbt_updated_at,
                {{ snapshot_get_time() }} as dbt_valid_to,
                snapshotted_data.dbt_scd_id,
                {{ snapshot_get_time() }} as dbt_valid_to_or_from
    
            from snapshotted_data
            left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
            where source_data.dbt_unique_key is null
        )
        {%- endif %}
    
        select * from insertions
        union all
        select * from updates
        {%- if strategy.invalidate_hard_deletes %}
        union all
        select * from deletes
        {%- endif %}
    
    {%- endmacro %}
    
    {% macro default__build_snapshot_table(strategy, sql) %}
     
        select *, COALESCE(dbt_valid_to, dbt_valid_from) as dbt_valid_to_or_from
          from (
            select *,
                {{ strategy.scd_id }} as dbt_scd_id,
                {{ strategy.updated_at }} as dbt_updated_at,
                {{ strategy.updated_at }} as dbt_valid_from,
                nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
            from (
                {{ sql }}
            ) sbq
        )
     
    {% endmacro %}
    

    snapshot_merge.sql:

    {% macro postgres__snapshot_merge_sql(target, source, insert_cols) -%}
        {%- set insert_cols_csv = insert_cols | join(', ') -%}
    
        update {{ target }}
        set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to,
            dbt_valid_to_or_from = DBT_INTERNAL_SOURCE.dbt_valid_to
        from {{ source }} as DBT_INTERNAL_SOURCE
        where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text
          and DBT_INTERNAL_SOURCE.dbt_change_type::text in ('update'::text, 'delete'::text)
          and {{ target }}.dbt_valid_to is null;
    
        insert into {{ target }} ({{ insert_cols_csv }})
        select {% for column in insert_cols -%}
            DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %}
        {%- endfor %}
        from {{ source }} as DBT_INTERNAL_SOURCE
        where DBT_INTERNAL_SOURCE.dbt_change_type::text = 'insert'::text;
    {% endmacro %}