I have a data pipeline where, at random intervals, the a staging table called stg
is truncated and overwritten with records. Then, using MERGE
, the records in stg
should be merged into the dimension table dim
according to the following rules (it's a slowly changing dimension of type 2):
stg
that doesn't exist in dim
insert the row corresponding to that email into dim
with label 'INSERT'
stg
and dim
check to see if their corresponding data differs. If so, it's indicative of an 'UPDATE'
.stg
but does exist in dim
, this means it's been deleted so label with 'DELETE'
.Emails are unique to each user in this organization.
MERGE dim
USING stg
ON stg.email = dim.email
WHEN NOT MATCHED
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)
The problem? This query compares stg
against the entirety of dim
, when actually I only want to compare it with the following subset of dim
:
select *
from (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim
) as hist
where rnk = 1
Is it possible for me to MERGE
with dim
as my target table, and stg
as my source, but based only on the rnk=1
values in as computed in the RANK()
analytical function shown above?
Something like this?:
MERGE (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim
) as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
WHEN NOT MATCHED
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)
Unfortunately, you can’t run a merge and then launch a subquery as your example:
MERGE (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim)
as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
You would need to create a query like the next one:
MERGE dim
USING (
select *, RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from stg )
as stg_with_rank
ON stg_with_rank.email = dim.email AND
stg_with_rank.rnk = 1 [...]
You can see more information about this use case here