sqlgoogle-bigquerydata-warehousescd

Using MERGE with analytic functions (like RANK) on target table


I have a data pipeline where, at random intervals, the a staging table called stg is truncated and overwritten with records. Then, using MERGE, the records in stg should be merged into the dimension table dim according to the following rules (it's a slowly changing dimension of type 2):

Emails are unique to each user in this organization.

MERGE dim
USING stg
ON stg.email = dim.email
WHEN NOT MATCHED
    INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
    VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
    INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
    VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE 
    INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
    VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)

The problem? This query compares stg against the entirety of dim, when actually I only want to compare it with the following subset of dim:

select *
from (
    select *, 
    RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk 
    from dim
) as hist
where rnk = 1

Is it possible for me to MERGE with dim as my target table, and stg as my source, but based only on the rnk=1 values in as computed in the RANK() analytical function shown above?

Something like this?:

MERGE (
    select *, 
    RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk 
    from dim
) as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
WHEN NOT MATCHED
    INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
    VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
    INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
    VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE 
    INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
    VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)

Solution

  • Unfortunately, you can’t run a merge and then launch a subquery as your example:

    MERGE (
          select *,
      RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
      from dim)
    as dim_with_rank
    USING stg
    ON stg.email = dim_with_rank.email
    AND dim_with_rank.rnk = 1
    

    You would need to create a query like the next one:

    MERGE dim 
    USING (
            select *, RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
            from stg )
     as stg_with_rank
     ON stg_with_rank.email = dim.email AND
     stg_with_rank.rnk = 1 [...]
    

    You can see more information about this use case here