apache-flinkwatermarkflink-sql

How do watermarks work in FlinkSQL with late events


I'm trying to understand how watermarks work in FlinkSQL. I’ve created the following tables:

  CREATE TABLE currency_rates (
  currency STRING,
  conversion_rate STRING,
  update_time TIMESTAMP(3),
  WATERMARK FOR update_time AS update_time, 
  PRIMARY KEY(currency) NOT ENFORCED
  ) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'currency_rates1',
  'key.format' = 'raw',
  'value.format' = 'avro',
  'properties.auto.offset.reset' = 'earliest',
  'value.fields-include' = 'ALL',
  'scan.watermark.idle-timeout' = '1000'
  );  


  CREATE TABLE orders (
  order_id    STRING,
  price       DECIMAL(32,2),
  currency    STRING,
  order_time  TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS 
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'orders1',
  'value.format' = 'avro',
  'properties.auto.offset.reset' = 'latest',
  'value.fields-include' = 'ALL'
  );
  

To test this, I run the following query:

  SELECT
  orders.order_id,
  orders.price,
  orders.currency,
  currency_rates.conversion_rate,
  orders.order_time,
  currency_rates.update_time
  FROM orders
  LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
  ON orders.currency = currency_rates.currency;

These are the inserts I’m making:

INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);

The following result is displayed: enter image description here

Then, I run:

INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP - interval '5' minutes);
INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);

And:

enter image description here

I don't understand why record ORD007 appears, as it was inserted with a 5-minute delay, so I thought it should not be included because it came late out of order.


Solution

  • Flink doesn't automatically drop late events. Events that are behind the current watermark will sometimes be ignored by operations that are triggered by watermarks. In this case, ORD007 doesn't produce a complete result because the corresponding conversation rate info has already been dropped from state, but this is a left join, so it's still appropriate to produce a result.