I have a Kafka source with CDC (debezium) events, I tried various aggregation functions (TVF Hop Window, Over Aggregation, Group By Hop Windows, etc) but each have their own problems, either not support over CDC data, or not getting updated as new events come in.
The current approach that is working at least, but has lots of overhead (redundant HOP calculations) is this:
CREATE VIEW last_1_day_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY poolId
ORDER BY window_time DESC
) AS rn
FROM (
SELECT
poolId,
SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
COUNT(*) AS totalVolumeSwaps,
MAX(`timestamp`) AS `timestamp`,
MAX(`blockNumber`) AS `maxBlockNumber`,
MIN(`blockNumber`) AS `minBlockNumber`,
HOP_ROWTIME(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as window_time
FROM
swaps_stream_day
WHERE
amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
CURRENT_TIMESTAMP - INTERVAL '24' HOUR AS TIMESTAMP(3)
)
GROUP BY
poolId,
HOP(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '24' HOUR)
)
) WHERE rn = 1;
CREATE VIEW prev_1_day_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY poolId
ORDER BY window_time DESC
) AS rn
FROM (
SELECT
poolId,
SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
COUNT(*) AS totalVolumeSwaps,
MAX(`timestamp`) AS `timestamp`,
MAX(`blockNumber`) AS `maxBlockNumber`,
MIN(`blockNumber`) AS `minBlockNumber`,
HOP_ROWTIME(`rowtime`, INTERVAL '10' MINUTE, INTERVAL '24' HOUR) as window_time
FROM
swaps_stream_day
WHERE
amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
CURRENT_TIMESTAMP - INTERVAL '48' HOUR AS TIMESTAMP(3)
)
GROUP BY
poolId,
HOP(`rowtime`, INTERVAL '10' MINUTE, INTERVAL '24' HOUR)
)
) WHERE rn = 6;
This means on initial deployment we'll have AGG nodes that are super busy just because they are calculating irrelevant HOPs. Is there a more efficient way (in terms of memory usage + initial scan time) to do this in Flink SQL?
I used these other alternatives with no luck:
FLINK-20281: Window aggregation supports changelog input stream will be included in Flink 1.19. This will address your use case directly.
This feature is in the master branch now, but won't be released for a few weeks, until the release is finalized.
I can't really think of a good workaround.