apache-flinkflink-sql

Flink SQL Watermark Strategy After Join Operation


My problem is that I cannot use the ORDER BY clause after the JOIN operation. To reproduce the problem,

CREATE TABLE stack (
    id INT PRIMARY KEY,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5',
  'fields.id.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='100'
);

This table has a watermark strategy and TIMESTAMP(3) *ROWTIME* type on ts.

Flink SQL> DESC stack;
+------+------------------------+-------+---------+--------+----------------------------+
| name |                   type |  null |     key | extras |                  watermark |
+------+------------------------+-------+---------+--------+----------------------------+
|   id |                    INT | FALSE | PRI(id) |        |                            |
|   ts | TIMESTAMP(3) *ROWTIME* |  TRUE |         |        | `ts` - INTERVAL '1' SECOND |
+------+------------------------+-------+---------+--------+----------------------------+
2 rows in set

However, if I define a view as a simple self-join

CREATE VIEW self_join AS (
SELECT l.ts, l.id, r.id
FROM stack as l INNER JOIN stack as r
ON l.id=r.id
);

it loses the watermark strategy but not the type,

Flink SQL> DESC self_join;
+------+------------------------+-------+-----+--------+-----------+
| name |                   type |  null | key | extras | watermark |
+------+------------------------+-------+-----+--------+-----------+
|   ts | TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
|   id |                    INT | FALSE |     |        |           |
|  id0 |                    INT | FALSE |     |        |           |
+------+------------------------+-------+-----+--------+-----------+
3 rows in set

I assume that we can preserve the watermark strategy and use ORDER BY after a JOIN operation but this is not the case. How can I add a watermark strategy again to the VIEW?

Thanks in advance.


Solution

  • Whenever Flink SQL performs a regular join in streaming mode (a join without any sort of temporal constraint), the newly created events emitted from the join aren't assigned timestamps. The watermarks are passed through, but without any timestamps to work with, the resulting stream doesn't have a time attribute. Which in turn means that you can't sort or apply windowing to the result.

    Why is this, and what can you do about it?

    Background

    Flink SQL uses time attributes (in this case, stack.ts) to optimize state retention. Because the stack stream/table has a time attribute, we know that this stream will be processed more-or-less in order, by time (the elements are constrained to be at most 1 second out-of-order). This then places a tight constraint on how much state must be retained in order to perform an operation like sorting this table -- a 1-second-long buffer will be enough.

    If stack didn't have a time attribute defined on it (i.e., a timestamp field with watermarking defined on it), then Flink SQL would refuse to sort it (in streaming mode) because doing so would require keeping around an unbounded amount of state, and it would be impossible to know how long to wait before emitting the first result.

    What you can do

    If you modify the join to be either an interval join or a temporal join then the result will still have timestamps and watermarks. E.g., you could do this:

    CREATE VIEW self_join AS (
      SELECT l.ts, l.id, r.id
      FROM stack as l INNER JOIN stack as r
      ON l.id=r.id
      WHERE ls.ts BETWEEN r.ts - INTERVAL '1' MINUTE AND r.ts
    );
    

    or you could do this:

    CREATE VIEW self_join AS (
      SELECT l.ts, l.id, r.id
      FROM stack as l INNER JOIN stack as r FOR SYSTEM_TIME AS OF r.ts
      ON l.id=r.id
    );
    

    In both of these cases, Flink's SQL engine will be able to retain less state than with the regular join, and it will be able to produce watermarks in the output stream/table.

    Another possibility would be to materialize the result of the join to an intermediate table, which you can arrange so it has a time attribute.

    Yet another possible solution would be to convert the result table to a DataStream, then use the DataStream API to apply watermarking, and then convert that stream back to a table. But that's only going to make sense if you have some domain knowledge that allows you to know how out-of-order the result stream might be -- and you probably could have expressed that same information as either an interval or temporal join.