When I use Flink SQL to execute the following statement, the error is reported as follows:
Request
Group the data in the user_behavior_kafka_table according to the user_id
field, and then take out the piece of data with the largest value of the ts
field in each group
excute sql
SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a
WHERE ts = (select max(b.ts)
FROM user_behavior_kafka_table AS b
WHERE a.user_id = b.user_id );
Flink version
1.11.2
error message
AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
Job deploy
On Yarn
Table Message
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa access item at","ts":100}
{"user_id":"ccc","item_id":"11-222-334","comment":"ccc access item at","ts":200}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":300}
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb access item at","ts":200}
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa access item at","ts":200}
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa access item at","ts":400}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":400}
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv access item at","ts":200}
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb access item at","ts":300}
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa access item at","ts":300}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":100}
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb access item at","ts":100}
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa access item at","ts":400}
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb access item at","ts":300}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc access item at","ts":400}
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv access item at","ts":200}
To get the results you expect from that query, it needs to be executed in batch mode. As a streaming query, the Flink SQL planner can't cope with it, and if it could, it would produce a stream of results, where the last result for each user_id
would match the expected results, but there would be additional, intermediate results.
For example, for user aaa, these results would appear:
aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400
but the row where ts=300 would be skipped, since it was never the row with the max value for ts.
If you want to make this work in streaming mode, try reformulating it as a top-n query:
SELECT user_id, item_id, ts FROM
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
FROM user_behavior_kafka_table)
WHERE row_num = 1;
I believe this should work, but I'm not in a position to easily test it.