How can I use Flink SQL to aggregate each list within a DataStream<List>?
// source
DataSource<List<Row>> source;
// listStream
SingleOutputStreamOperator<List<Row>> listRowStream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "sourceName")
.returns(Types.LIST(Types.ROW_NAMED(new String[]{"batch_id", "name", "value"}, Types.STRING, Types.STRING, Types.INT)));
The data of listRowStream looks like:
data sample 1:
[
Row["1", "a", 1]
Row["1", "a", 2]
Row["1", "b", 3]
],
data sample 2:
[
Row["2", "a", 4]
Row["2", "b", 5]
Row["2", "b", 6]
Row["2", "c", 2]
]
I have two implementation approaches:
Using the WITH statement to explode the array, then using window function to perform aggregation calculations based on batch_id and name. The SQL should be:
WITH (explode the array) SELECT batch_id, name, SUM(value) FROM TABLE(TUMBLE) GROUP BY batch_id, name
Using a FlatMap function to transform the array into individual records, then using window function to perform aggregation calculations based on batch_id and name. The SQL should be:
SELECT batch_id, name, SUM(value) FROM TABLE(TUMBLE) GROUP BY batch_id, name
The drawback of the two methods above is that if the window time is set too large, for example, 1 minute, it wastes resources. If the window time is set too short, it may not ensure that all the data with the same batch_id will fall within the same window.
In fact, I don't need a window.Each Kafka message is a List, and each List represents data from the same batch. Is it possible to directly perform aggregation calculations on the List without using windows or Flink state?
Thanks!
With Flink SQL, I would suggest implementing a user-defined table function that takes a list of rows as input, and returns a row as its output. In this way you don't have to introduce windows or state.