snowflake-cloud-data-platformdbt

is there a way to create an incremental model without duplicates


I have data flowing into the source table every hour and I am looking to create an incremental model that consumes it and performs some aggregate. I have done this but I seem to be getting multiple entries whenever the incremental model runs instead of 1 aggregated entry

   {{ config(materialized='incremental')}}

WITH WEBSITE_MAIN_PAGE_VIEWS AS (
    SELECT
        DATE_TIME,
        POST_VISID_HIGH,
        POST_VISID_LOW,
        VISIT_NUM,
        VISIT_START_TIME_GMT,
        POST_PAGENAME,
        POST_EVENT_LIST,
        _DMLTIME

    FROM {{ ref('stg_adobe_hit_data_poc') }}
    WHERE
        1 = 1
        -- INCREMENTAL FILTER-----
        {% if is_incremental() %}
            AND _DMLTIME > (
                SELECT
                    COALESCE(
                        MAX(T._DMLTIME),
                        TO_TIMESTAMP_TZ('1900-01-01 00:00:00')
                    )
                FROM {{ this }} AS T
            )
        {% endif %}

)
    SELECT 
        'PO' AS BRAND,
        DATE(DATE_TIME) AS DATE,
        '*' AS BASE_MARKET,
        COUNT(DISTINCT CONCAT(POST_VISID_HIGH, POST_VISID_LOW, VISIT_NUM, VISIT_START_TIME_GMT)) AS TOTAL_WEBSITE_VISITS,
        COUNT(
            DISTINCT CASE
                WHEN POST_PAGENAME = 'po:en_gb'
                THEN CONCAT(POST_VISID_HIGH, POST_VISID_LOW, VISIT_NUM, VISIT_START_TIME_GMT)
            END
        ) AS HOME_PAGE_VISITS,
        MAX(_DMLTIME) AS _DMLTIME
    FROM WEBSITE_MAIN_PAGE_VIEWS
    GROUP BY DATE(DATE_TIME)

Here. is the sample of the output the code above is producing. if its not an incremental model it works fine as expected. but how do I convert this into an incremental model and it aggregates data as it arrives without any duplicate entry

Here. is the sample of the output the code above is producing. if its not an incremental model it works fine as expected. but how do I convert this into an incremental model and it aggregates data as it arrives without any duplicate entry


Solution

  • I used this approach from this documentation and it worked for me

    {{
        config(
            materialized='incremental',
            unique_key='date_day'
        )
    }}
    
    select
        date_trunc('day', event_at) as date_day,
        count(distinct user_id) as daily_active_users
    
    from {{ ref('app_data_events') }}
    
    
    {% if is_incremental() %}
    
      -- this filter will only be applied on an incremental run
      -- (uses >= to include records arriving later on the same day as the last run of this model)
      where date_day >= (select coalesce(max(date_day), '1900-01-01') from {{ this }})
    
    {% endif %}
    
    group by 1