/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
`pk` string,
`id` string,
`name` string,
`headers` MAP<STRING, BYTES> METADATA ,
`hard_deleted` boolean,
`kafka_key` STRING,
`ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
`procTime` AS PROCTIME(),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'group_id_1',
'topic-pattern' = '^topic(_backfill)?$',
'value.format' = 'json',
'format' = 'json',
'key.format' = 'raw',
'key.fields' = 'kafka_key',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
I have the above kafka table, and trying to run this query
SELECT LAG(pk)
OVER (
PARTITION BY id ORDER BY procTime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
)
AS prev_data_hash
FROM eoj_table;
I get this error
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
How do I use LAG
function and have it look current row and previous row only.
You can do it like this:
SELECT LAG(pk, 1)
OVER (
PARTITION BY id ORDER BY procTime
)
AS prev_data_hash
FROM eoj_table;
This query doesn't specify a number of ROWs or a RANGE, but that doesn't matter. It's the LAG
function that manages the state for this query, and its implementation only keeps as many previous values in its accumulator as necessary (in this case, it will keep 2).
If you're curious, here's the relevant code:
public void accumulate(LagAcc<T> acc, T value) throws Exception {
acc.buffer.add(value);
while (acc.buffer.size() > acc.offset + 1) {
acc.buffer.removeFirst();
}
}
In this case, the value of the offset
is 1, so it keeps a buffer of 2 values.