I have data flowing into the source table every hour and I am looking to create an incremental model that consumes it and performs some aggregate. I have done this but I seem to be getting multiple entries whenever the incremental model runs instead of 1 aggregated entry
{{ config(materialized='incremental')}}
WITH WEBSITE_MAIN_PAGE_VIEWS AS (
SELECT
DATE_TIME,
POST_VISID_HIGH,
POST_VISID_LOW,
VISIT_NUM,
VISIT_START_TIME_GMT,
POST_PAGENAME,
POST_EVENT_LIST,
_DMLTIME
FROM {{ ref('stg_adobe_hit_data_poc') }}
WHERE
1 = 1
-- INCREMENTAL FILTER-----
{% if is_incremental() %}
AND _DMLTIME > (
SELECT
COALESCE(
MAX(T._DMLTIME),
TO_TIMESTAMP_TZ('1900-01-01 00:00:00')
)
FROM {{ this }} AS T
)
{% endif %}
)
SELECT
'PO' AS BRAND,
DATE(DATE_TIME) AS DATE,
'*' AS BASE_MARKET,
COUNT(DISTINCT CONCAT(POST_VISID_HIGH, POST_VISID_LOW, VISIT_NUM, VISIT_START_TIME_GMT)) AS TOTAL_WEBSITE_VISITS,
COUNT(
DISTINCT CASE
WHEN POST_PAGENAME = 'po:en_gb'
THEN CONCAT(POST_VISID_HIGH, POST_VISID_LOW, VISIT_NUM, VISIT_START_TIME_GMT)
END
) AS HOME_PAGE_VISITS,
MAX(_DMLTIME) AS _DMLTIME
FROM WEBSITE_MAIN_PAGE_VIEWS
GROUP BY DATE(DATE_TIME)
Here. is the sample of the output the code above is producing. if its not an incremental model it works fine as expected. but how do I convert this into an incremental model and it aggregates data as it arrives without any duplicate entry
I used this approach from this documentation and it worked for me
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}
select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users
from {{ ref('app_data_events') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
-- (uses >= to include records arriving later on the same day as the last run of this model)
where date_day >= (select coalesce(max(date_day), '1900-01-01') from {{ this }})
{% endif %}
group by 1