sqljinja2parquetdbtdbt-cloud

Select unique records in SQL union based on latest updated time value


I have incremental data loaded everyday in different folders like

./././day=26/<time>/<name>.parquet

Sometime the pipelines fails and we run it and we get multiple time folders under a single day folder which may have overlaps from previous folders or maybe not. So just to be sure I have a macro in my dbt which get the file paths for the current day for all time folders inside and then a select statement with UNION ALL to keep all records. But since it is incremental I get an error that for a single ID there are two different values and DBT doesn't know which to pick.

Is there a condition where I can pick all the data and then filter it based on latest updated time value. There is a column ets which captures this update time.

SQL Statement :

{% set directory_path = get_dynamic_path(table_name = 'table name') %}

{% for file_path in directory_path %}
    SELECT 
        *,
        date_add(MILLISECOND, ets, '1970-01-01') as ets_converted
    FROM
        PARQUET.{{ file_path }}
    {% if not loop.last %}UNION ALL{% endif %}
{% endfor %}

Works fine for single time folders or time folders with no overlaps, but fails for overlaps.


Solution

  • With row_number()

    Your database probably supports the row_number() window function, which could tag each row with its relative position against other rows with the same ID; then you could filter to get only the most recent row (which will be the first if we ORDER BY ets DESC):

    {% set directory_path = get_dynamic_path(table_name = 'table name') %}
    
    WITH everything AS
    (
    {% for file_path in directory_path %}
        SELECT 
            *,
            date_add(MILLISECOND, ets, '1970-01-01') as ets_converted
        FROM
            PARQUET.{{ file_path }}
        {% if not loop.last %}UNION ALL{% endif %}
    {% endfor %}
    ),
    ordered AS (SELECT *, row_number() OVER (PARTITION BY <your unique ID> ORDER BY ets DESC) AS pos)
    SELECT * FROM ordered WHERE pos = 1
    

    Note that even if multiple rows for a given ID have the same ets (pure duplicates), row_number() ensures only one of them will get pos = 1.

    With DISTINCT ON

    row_number() is the most portable way to implement it; then depending on the RDBMS engine used to access your parquet files, you may have access to DISTINCT ON which would simplify a bit the expression:

    WITH everything AS
    (
        […]
    )
    SELECT DISTINCT ON (<your unique ID>) *
    FROM everything
    ORDER BY ets DESC