apache-flink

Whether Processing Time Temporal Join supports FOR SYSTEM_TIME AS OF syntax or not


I am reading Flink 1.20 doc at https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

It is said that it doesn't support processing time temporal join using `the FOR SYSTEM_TIME AS OF syntax currently and should use LATERAL TABLE to temporal join

Currently, the FOR SYSTEM_TIME AS OF syntax used in temporal join with latest version of any view/table is not support yet, you can use temporal table function syntax as following:

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency
 

But, in the lookup join section (https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/joins/#lookup-join), it is using the FOR SYSTEM_TIME AS OF syntax

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

In the above, processing time temporal join says it it not supporting FOR SYSTEM_TIME AS OF syntax used in temporal join, while in the lookup join section it is using processing-time-temporal-join syntax?

So, whether FOR SYSTEM_TIME AS OF syntax used in temporal join is supported or not, completely confused.


Solution

  • Lookup joins and temporal joins use the same syntax, but have been given different names to help express their different usage patterns. Lookup joins require a special source connector that supports lookups, while temporal joins that use the FOR SYSTEM TIME AS OF syntax use any of the standard streaming sources.

    It's more or less intractable for lookup joins to use event time, and the community has chosen not to support processing time temporal joins using this syntax because the behavior can be confusing.

    Why do these two operations use the same syntax? Well, this is the standard way to express a temporal join -- Flink didn't invent temporal joins, this is how they are done in SQL. And semantically a lookup join really is the same thing as a processing time temporal join. It's a one-off join, using the currently available record for a given key.

    See What is the difference between Lookup and Processing Time Temporal join in Flink? for more on this topic.