I'm trying to understand how watermarks work in FlinkSQL. I’ve created the following tables:
CREATE TABLE currency_rates (
currency STRING,
conversion_rate STRING,
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'currency_rates1',
'key.format' = 'raw',
'value.format' = 'avro',
'properties.auto.offset.reset' = 'earliest',
'value.fields-include' = 'ALL',
'scan.watermark.idle-timeout' = '1000'
);
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'orders1',
'value.format' = 'avro',
'properties.auto.offset.reset' = 'latest',
'value.fields-include' = 'ALL'
);
To test this, I run the following query:
SELECT
orders.order_id,
orders.price,
orders.currency,
currency_rates.conversion_rate,
orders.order_time,
currency_rates.update_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
These are the inserts I’m making:
INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);
The following result is displayed:
Then, I run:
INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP - interval '5' minutes);
INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);
And:
I don't understand why record ORD007 appears, as it was inserted with a 5-minute delay, so I thought it should not be included because it came late out of order.
Flink doesn't automatically drop late events. Events that are behind the current watermark will sometimes be ignored by operations that are triggered by watermarks. In this case, ORD007 doesn't produce a complete result because the corresponding conversation rate info has already been dropped from state, but this is a left join, so it's still appropriate to produce a result.