apache-flinkflink-streamingflink-sql

How can I use Flink SQL to aggregate each list within a DataStream<List<Row>>?


How can I use Flink SQL to aggregate each list within a DataStream<List>?

// source
DataSource<List<Row>> source;

// listStream
SingleOutputStreamOperator<List<Row>> listRowStream = env
    .fromSource(source, WatermarkStrategy.noWatermarks(), "sourceName")
    .returns(Types.LIST(Types.ROW_NAMED(new String[]{"batch_id", "name", "value"}, Types.STRING, Types.STRING, Types.INT)));

The data of listRowStream looks like:
data sample 1:
[
    Row["1", "a", 1]
    Row["1", "a", 2]
    Row["1", "b", 3]
],
data sample 2:
[
    Row["2", "a", 4]
    Row["2", "b", 5]
    Row["2", "b", 6]
    Row["2", "c", 2]
]

I have two implementation approaches:

  1. Using the WITH statement to explode the array, then using window function to perform aggregation calculations based on batch_id and name. The SQL should be:

    WITH (explode the array) SELECT batch_id, name, SUM(value) FROM TABLE(TUMBLE) GROUP BY batch_id, name

  2. Using a FlatMap function to transform the array into individual records, then using window function to perform aggregation calculations based on batch_id and name. The SQL should be:

    SELECT batch_id, name, SUM(value) FROM TABLE(TUMBLE) GROUP BY batch_id, name

The drawback of the two methods above is that if the window time is set too large, for example, 1 minute, it wastes resources. If the window time is set too short, it may not ensure that all the data with the same batch_id will fall within the same window.

In fact, I don't need a window.Each Kafka message is a List, and each List represents data from the same batch. Is it possible to directly perform aggregation calculations on the List without using windows or Flink state?

Thanks!


Solution

  • With Flink SQL, I would suggest implementing a user-defined table function that takes a list of rows as input, and returns a row as its output. In this way you don't have to introduce windows or state.