dbtincremental-build

I want to use the condition union if exists with a table referred with {{this}} in snowflake SQL


In dbt, I want to append table if it exists. Basically, I am building an incremental model where I find out lagged values from previous day, for the first run, I want to union today's data along with {{this}}, I know that for the first day {{this}} has no value, so I am trying to skip it in the union.

I am using the below query but it is returning an error:

{{
    config(
        materialized='incremental',
        unique_key='date_run',
        full_refresh=true
    )
}}

with subs_count as 
(
    select 
        current_date() as date_run,
        count(distinct email_address) as distinct_email,
        subscriber
    from {{ source('fan_table_sandbox', 'international_fan_data') }}
    group by date_run,subscriber
),
aggregated_data as 
(
    select
        *
    from subs_count

    union all

    if (exists((select *
                from {{this}} if exists {{this}}
    order by date_run desc limit 1))
),
lag_values as 
(
    select  
        *,
        lag(distinct_email) over (partition by subscriber order by  date_run asc) as previous_day_count
    from aggregated_data
)
select * from lag_values

How can this be resolved?


Solution

  • I believe you need to use is_incremental() condition.

    There are several additional things:

    1. Non-exists column nba_subscriber
    2. You take the latest date_run in aggregated_data, but then do partition by subscriber. This will lead that you have only one subscriber left from the previous date. You might want to use rank() instead.
    3. On dbt run --full-refresh you will have all historical data deleted. Make sure that is expected. Otherwise change full_refresh config to false or much better approach would be to add created_date field to your international_fan_data table during Extract and Load steps. Then you don't need incremental at all.
    {{
        config(
            materialized='incremental',
            unique_key='date_run',
            full_refresh=true
        )
    }}
    
    with subs_count as 
    (
        select 
            current_date() as date_run,
            count(distinct email_address) as distinct_email,
            subscriber
        from {{ source('fan_table_sandbox', 'international_fan_data') }}
        group by date_run,subscriber
    ),
    
    aggregated_data as 
    (
        select
            *
        from subs_count
    
        {% if is_incremental() %}
    
        union all
    
        select * from {{ this }}
        qualify rank() over(order by date_run desc) = 1
    
        {% endif %}
    ),
    
    lag_values as 
    (
        select  
            *,
            lag(distinct_email) over (partition by subscriber order by  date_run asc) as previous_day_count
        from aggregated_data
    )
    
    select * from lag_values