Need help on fixing the below errors while adding Window on Table API.
Env is with flinkVersion = "1.20.0"
I trying to add Tumble Window to View, expectation is to apply LISTAGG() function over the bunch of events received during the Window time.
Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.time.Instant (java.time.LocalDateTime and java.time.Instant are in module java.base of loader 'bootstrap') at org.apache.flink.api.common.typeutils.base.InstantSerializer.copy(InstantSerializer.java:31) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyNameBased(RowSerializer.java:177)
Like to know what is the right org.apache.flink.table.api.DataTypes to be used to store the String as Timestamp in Table API that can be later used for Window operations.
I also tried by creating my_view with CURRENT_TIMESTAMP AS event_time as below and later adding TUMBLE on the my_view
++++++++++++++++++++++++++++++++++++++++++++++++++++ streamTableEnvironment .executeSql( " CREATE VIEW my_view AS SELECT ACFT_NO_ID as ID, LEVEL_CD as Name, CURRENT_TIMESTAMP AS event_time FROM acCapLevelsMappedView") // + " WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND FROM acCapLevelsMappedView") .print();
streamTableEnvironment.executeSql(" desc my_view").print();
streamTableEnvironment
.executeSql(
" CREATE VIEW my_aggregated_view AS SELECT ID, LISTAGG( "
+ " CONCAT_WS( "
+ " CHR(31), "
+ " Name "
+ " ), '\\u001E' ) , TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,"
+ " TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end FROM my_view GROUP BY ID, TUMBLE(event_time, INTERVAL '5' MINUTE)")
.print();
streamTableEnvironment.executeSql(" select * from my_aggregated_view").print();
++++++++++++++++++++++++++++++++++++++++++++++++++++
I see below error Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) encountered. at org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:49) at org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:80) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) at org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178) at org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211) at org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
I am trying to add Tumble Window to on Table API and expecting to apply LISTAGG on the data with in the Window.
The error
Window aggregate can only be defined over a time attribute column
means that you haven't defined a watermark strategy on the timestamp column being used for windowed aggregation.