I have incremental data loaded everyday in different folders like
./././day=26/<time>/<name>.parquet
Sometime the pipelines fails and we run it and we get multiple time folders under a single day folder which may have overlaps from previous folders or maybe not. So just to be sure I have a macro in my dbt which get the file paths for the current day for all time folders inside and then a select statement with UNION ALL to keep all records. But since it is incremental I get an error that for a single ID there are two different values and DBT doesn't know which to pick.
Is there a condition where I can pick all the data and then filter it based on latest updated time value. There is a column ets which captures this update time.
SQL Statement :
{% set directory_path = get_dynamic_path(table_name = 'table name') %}
{% for file_path in directory_path %}
SELECT
*,
date_add(MILLISECOND, ets, '1970-01-01') as ets_converted
FROM
PARQUET.{{ file_path }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
Works fine for single time folders or time folders with no overlaps, but fails for overlaps.
row_number()
Your database probably supports the row_number()
window function, which could tag each row with its relative position against other rows with the same ID; then you could filter to get only the most recent row (which will be the first if we ORDER BY ets DESC
):
{% set directory_path = get_dynamic_path(table_name = 'table name') %}
WITH everything AS
(
{% for file_path in directory_path %}
SELECT
*,
date_add(MILLISECOND, ets, '1970-01-01') as ets_converted
FROM
PARQUET.{{ file_path }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}
),
ordered AS (SELECT *, row_number() OVER (PARTITION BY <your unique ID> ORDER BY ets DESC) AS pos)
SELECT * FROM ordered WHERE pos = 1
Note that even if multiple rows for a given ID have the same ets
(pure duplicates), row_number()
ensures only one of them will get pos = 1
.
DISTINCT ON
row_number()
is the most portable way to implement it; then depending on the RDBMS engine used to access your parquet files, you may have access to DISTINCT ON
which would simplify a bit the expression:
WITH everything AS
(
[…]
)
SELECT DISTINCT ON (<your unique ID>) *
FROM everything
ORDER BY ets DESC