apache-kafkaapache-flinkflink-streamingflink-sql

Efficient Flink SQL for SUM for last 24 hours and previous to last 24 hours over CDC events?


I have a Kafka source with CDC (debezium) events, I tried various aggregation functions (TVF Hop Window, Over Aggregation, Group By Hop Windows, etc) but each have their own problems, either not support over CDC data, or not getting updated as new events come in.

The current approach that is working at least, but has lots of overhead (redundant HOP calculations) is this:


CREATE VIEW last_1_day_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeSwaps,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as window_time
        FROM
            swaps_stream_day
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '24' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '24' HOUR)
    )
) WHERE rn = 1;

CREATE VIEW prev_1_day_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeSwaps,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '10' MINUTE, INTERVAL '24' HOUR) as window_time
        FROM
            swaps_stream_day
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '48' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '10' MINUTE, INTERVAL '24' HOUR)
    )
) WHERE rn = 6;

This means on initial deployment we'll have AGG nodes that are super busy just because they are calculating irrelevant HOPs. Is there a more efficient way (in terms of memory usage + initial scan time) to do this in Flink SQL?

I used these other alternatives with no luck:


Solution

  • FLINK-20281: Window aggregation supports changelog input stream will be included in Flink 1.19. This will address your use case directly.

    This feature is in the master branch now, but won't be released for a few weeks, until the release is finalized.

    I can't really think of a good workaround.

    https://issues.apache.org/jira/browse/FLINK-20281