apache-kafkaapache-flinkflink-streamingflink-sqlflink-batch

AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin]


When I use Flink SQL to execute the following statement, the error is reported as follows:

Request

Group the data in the user_behavior_kafka_table according to the user_id field, and then take out the piece of data with the largest value of the ts field in each group

excute sql

SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a 
WHERE ts = (select max(b.ts) 
FROM user_behavior_kafka_table AS b 
WHERE a.user_id = b.user_id );

Flink version

1.11.2

error message

AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])

Job deploy

On Yarn

Table Message

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa access item at","ts":100}

{"user_id":"ccc","item_id":"11-222-334","comment":"ccc access item at","ts":200}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":300}

{"user_id":"bbb","item_id":"11-222-334","comment":"bbb access item at","ts":200}

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa access item at","ts":200}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa access item at","ts":400}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":400}

{"user_id":"vvv","item_id":"11-222-334","comment":"vvv access item at","ts":200}

{"user_id":"bbb","item_id":"11-222-333","comment":"bbb access item at","ts":300}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa access item at","ts":300}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":100}

{"user_id":"bbb","item_id":"11-222-334","comment":"bbb access item at","ts":100}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa access item at","ts":400}

{"user_id":"bbb","item_id":"11-222-333","comment":"bbb access item at","ts":300}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":400}

{"user_id":"vvv","item_id":"11-222-334","comment":"vvv access item at","ts":200}


Solution

  • To get the results you expect from that query, it needs to be executed in batch mode. As a streaming query, the Flink SQL planner can't cope with it, and if it could, it would produce a stream of results, where the last result for each user_id would match the expected results, but there would be additional, intermediate results.

    For example, for user aaa, these results would appear:

    aaa 11-222-333 100
    aaa 11-222-333 200
    aaa 11-222-334 400
    

    but the row where ts=300 would be skipped, since it was never the row with the max value for ts.

    If you want to make this work in streaming mode, try reformulating it as a top-n query:

    SELECT user_id, item_id, ts FROM
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
      FROM user_behavior_kafka_table)
    WHERE row_num = 1;
    

    I believe this should work, but I'm not in a position to easily test it.