Problem Description We're using Apache Flink Table API on top of a CDC (Change Data Capture) stream, and trying to publish only the final aggregated state after a row update — not intermediate partial updates.
Scenario: A Flink table1 is created from a CDC DataStream, with composite primary keys.
We apply a LISTAGG over a subset of the composite key to concatenate fields for all rows sharing the same key.
When any of those rows is updated, we want to emit only one event:
The event must contain the updated row along with the other rows for the same key (via LISTAGG).
However, we are observing two emissions:
The first missing the updated row
The second containing the correct final state
What We Expect: We want to emit only the final +U that includes:
The updated row
Other rows for the same key (via LISTAGG)
We do not want intermediate partial updates being emitted.
What We Observe: If an ID has only one row, a single correct event is emitted.
If an ID has multiple rows, on updating one row, Flink emits:
-U (removal of old row)
+U (incomplete, missing the updated row)
[❌ Published — partial event]
-U (removal of other rows)
+U (full row set with updated values)
[✅ Published — correct final event]
So we get two events, one incomplete and one final. We only want the final one.
Reproducible Steps:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create a Table from CDC source
Table table1 = tableEnv.fromDataStream(cdcDataStream);
// Create a query with LISTAGG and GROUP BY
String sqlQuery =
"SELECT t1.ID, LISTAGG(CONCAT_WS(CHR(31), t1.c1, t1.c2), '\u001E') AS t1_concat " +
"FROM table1 t1 " +
"WHERE t1.ID='ID' " +
"GROUP BY t1.ID";
// Run the query
Table resultTable = tableEnv.sqlQuery(sqlQuery);
// Convert to changelog stream and process
DataStream<JsonNode> stream = tableEnv
.toChangelogStream(resultTable)
.filter(new UpdateBeforeFilter()) // filters out -U
.map(new RowToJsonNodeMapper());
stream
.keyBy(EventData::getKey)
.sinkTo(new KafkaSinkWriter());
env.execute();
Attempted Workaround:
We tried filtering out RowKind.UPDATE_BEFORE
(-U) records using a custom UpdateBeforeFilter
. However, we're still getting both +U events — the partial and final ones.
Question: How can we emit only the final +U event that includes the updated row along with the other grouped rows, and suppress the intermediate +U emissions that contain partial/incomplete data?
Additional Debug Info: SQL uses LISTAGG, GROUP BY, and CDC-based table
CDC emits changelog: -U, +U, -U, +U
Flink translates the changelog into intermediate states which emit too early
Using toChangelogStream()
and filtering on RowKind
isn’t sufficient, as both +U records pass the filter
Flink SQL doesn't behave in the same way as a database. Instead, it incrementally processes the input streams(s), producing at each step the corresponding result. After consuming the entire input -- assuming the input ends at some point -- it will have produced the same result, but the output stream will show all of the steps it passed through along the way.
The standard answer for queries like yours that produce an update stream is to use a sink that read an update stream and maintain a materialized view of the result. For example, a JDBC database.
Or you can manipulate the update stream, as you've tried to do, but you'll need to introduce a stateful operator (a KeyedProcessFunction) to do the filtering in a smarter way. The same sort of stateful filtering could also be done in a ProcessTableFunction, which would avoid having to convert the Table to a DataStream.