Our client wants to create a PowerBI report that leans on an incremental refreshing table. After some experiments, they defined all the columns they need for the report. These are dbt_valid_from
, dbt_valid_to
and dbt_valid_to_or_from
.
After some googleing, I found some information.
I implemented the following settings:
There is a snapshot sql in the dbt subfolder called dummy_table_hist.sql
:
{% snapshot dummy_table_hist %}
{{
config(
target_schema='dummy_schema',
strategy='check',
check_cols=['hash_diff'],
unique_key='hash_id',
invalidate_hard_deletes=True
)
}}
SELECT
*
FROM {{ source('bi_common', 'dummy_table') }}
{% endsnapshot %}
There is a helper.sql
file which overrides the snapshot table generation logic.
{% macro default__build_snapshot_table(strategy, sql) %}
select *, COALESCE(dbt_valid_to, dbt_valid_from) as dbt_valid_to_or_from
from (
select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
from (
{{ sql }}
) sbq
)
{% endmacro %}
The initial data is as follows:
After creating the dummy_schema.dummy_table
and running the dbt snapshot command to create dummy_schema.dummy_table_hist
table, I changed the amount
to 2 where id = 1
. By running the dbt snapshot code, it appears in the dummy_schema.dummy_table_hist
table, but there are two errors with that:
dbt_valid_from
and dbt_valid_to
fields are filled, the dbt_valid_to_or_from
column takes values from the dbt_valid_from
instead of the dbt_valid_to.dbt_valid_to_or_from
column are NULL. (For the first dbt snapshot run it works well.)I use dbt-core==1.7.17
and dbt-redshift==1.7.7
versions because of other dependecies.
What should I do differently?
I found the solution to my own question. 🎉 I needed to override 3 functions in the following two files:
So I extended the helpers.sql
file, and added a new snapshot_merge.sql
file as well.
These contain the following codes:
helpers.sql
:
{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
with snapshot_query as (
{{ source_sql }}
),
snapshotted_data as (
select *,
{{ strategy.unique_key }} as dbt_unique_key
from {{ target_relation }}
where dbt_valid_to is null
),
insertions_source_data as (
select *, COALESCE(dbt_valid_to, dbt_valid_from) as dbt_valid_to_or_from
from (
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id
from snapshot_query
) sbq
),
updates_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to
from snapshot_query
),
{%- if strategy.invalidate_hard_deletes %}
deletes_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}
insertions as (
select
'insert' as dbt_change_type,
source_data.*
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)
),
updates as (
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id,
COALESCE(source_data.dbt_valid_to, source_data.dbt_valid_from) as dbt_valid_to_or_from
from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
{{ strategy.row_changed }}
)
)
{%- if strategy.invalidate_hard_deletes -%}
,
deletes as (
select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id,
{{ snapshot_get_time() }} as dbt_valid_to_or_from
from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
)
{%- endif %}
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}
{%- endmacro %}
{% macro default__build_snapshot_table(strategy, sql) %}
select *, COALESCE(dbt_valid_to, dbt_valid_from) as dbt_valid_to_or_from
from (
select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
from (
{{ sql }}
) sbq
)
{% endmacro %}
snapshot_merge.sql
:
{% macro postgres__snapshot_merge_sql(target, source, insert_cols) -%}
{%- set insert_cols_csv = insert_cols | join(', ') -%}
update {{ target }}
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to,
dbt_valid_to_or_from = DBT_INTERNAL_SOURCE.dbt_valid_to
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text
and DBT_INTERNAL_SOURCE.dbt_change_type::text in ('update'::text, 'delete'::text)
and {{ target }}.dbt_valid_to is null;
insert into {{ target }} ({{ insert_cols_csv }})
select {% for column in insert_cols -%}
DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %}
{%- endfor %}
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_change_type::text = 'insert'::text;
{% endmacro %}