sqlapache-flinkflink-sql

Flink SQL Hints in Confluent Flink Window Query


How do I use hints in a windowing query on Confluent Flink?

Example of a hint:

/*+ OPTIONS('scan.startup.mode'='latest-offset') */

I would like to use it in a query like below:

INSERT INTO topic2(id, name, avgValue)
SELECT id, name, AVG(value) as avgValue
FROM TABLE(TUMBLE(TABLE topic1, DESCRIPTOR(messageTimestamp), INTERVAL '30' SECONDS))
GROUP BY window_start, window_end, id, name;

Solution

  • In order to do this, the TABLE topic syntax will need to be replaced with a SELECT query. (Note that the query does need to be in parentheses.)

    This should do it:

    INSERT INTO topic2(id, name, avgValue)
    SELECT id, name, AVG(value) as avgValue
    FROM TABLE(
      TUMBLE(
        (SELECT * FROM topic1 /*+ OPTIONS('scan.startup.mode'='latest-offset') */), 
        DESCRIPTOR(messageTimestamp), 
        INTERVAL '30' SECONDS
      ))
    GROUP BY window_start, window_end, id, name;
    

    Further similar queries may need to list the DESCRIPTOR (in the case that it is a metadata virtual column like $rowtime); that doesn't appear to be necessary here.