apache-flinkflink-streamingflink-sql

Flink sql LAG function with window_frame does not work


/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
   `pk` string,
   `id` string,
   `name` string,
   `headers` MAP<STRING, BYTES> METADATA ,
   `hard_deleted` boolean,
   `kafka_key` STRING,
   `ts` timestamp(3) METADATA  FROM 'timestamp'VIRTUAL,
   `procTime` AS PROCTIME(),
    WATERMARK FOR ts AS ts
) WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = 'kafka:29092',
   'properties.group.id' = 'group_id_1',
   'topic-pattern' = '^topic(_backfill)?$',
   'value.format' = 'json',
   'format' = 'json',
   'key.format' = 'raw',
   'key.fields' = 'kafka_key',
   'value.fields-include' = 'EXCEPT_KEY',
   'scan.startup.mode' = 'earliest-offset',
   'json.timestamp-format.standard' = 'ISO-8601',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
);

I have the above kafka table, and trying to run this query

SELECT LAG(pk) 
  OVER (
    PARTITION BY id ORDER BY procTime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
  ) 
AS prev_data_hash 
FROM eoj_table;

I get this error

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

How do I use LAG function and have it look current row and previous row only.


Solution

  • You can do it like this:

    SELECT LAG(pk, 1) 
      OVER (
        PARTITION BY id ORDER BY procTime
      ) 
    AS prev_data_hash 
    FROM eoj_table;
    

    This query doesn't specify a number of ROWs or a RANGE, but that doesn't matter. It's the LAG function that manages the state for this query, and its implementation only keeps as many previous values in its accumulator as necessary (in this case, it will keep 2).

    If you're curious, here's the relevant code:

        public void accumulate(LagAcc<T> acc, T value) throws Exception {
            acc.buffer.add(value);
            while (acc.buffer.size() > acc.offset + 1) {
                acc.buffer.removeFirst();
            }
        }
    

    In this case, the value of the offset is 1, so it keeps a buffer of 2 values.