Need help on fixing the below errors while adding Window on Table API.
Env is with flinkVersion = "1.20.0"
I'm trying to add Tumble Window to View and expecting to apply LISTAGG()
function over the bunch of events received during the Window time.
DataStream<Row>
. While converting, we defined the TypeInformation
if attribute is of String then Types.STRING
and if attribute is related to timestamp string given TypeConversions.fromDataTypeToLegacyInfo(DataTypes.TIMESTAMP_LTZ(3))
DataStream<Row>
to Table API to perform SQL Join operations.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)
(also tried with DataTypes.TIMESTAMP_LTZ(3)
)java.timestamp
to Instant
and saying I can't cast to Instant
.Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.time.Instant (java.time.LocalDateTime and java.time.Instant are in module java.base of loader 'bootstrap')
at org.apache.flink.api.common.typeutils.base.InstantSerializer.copy(InstantSerializer.java:31)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyNameBased(RowSerializer.java:177)
I'd like to know what the right org.apache.flink.table.api.DataTypes
to use to store the String as Timestamp in Table API that can be later used for Window operations.
I also tried by creating my_view with CURRENT_TIMESTAMP AS event_time
as below and later adding TUMBLE on the my_view
streamTableEnvironment
.executeSql(
" CREATE VIEW my_view AS SELECT ACFT_NO_ID as ID, LEVEL_CD as Name, CURRENT_TIMESTAMP AS event_time FROM acCapLevelsMappedView")
// + " WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND FROM acCapLevelsMappedView")
.print();
streamTableEnvironment.executeSql(" desc my_view").print();
streamTableEnvironment
.executeSql(
" CREATE VIEW my_aggregated_view AS SELECT ID, LISTAGG( "
+ " CONCAT_WS( "
+ " CHR(31), "
+ " Name "
+ " ), '\\u001E' ) , TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,"
+ " TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end FROM my_view GROUP BY ID, TUMBLE(event_time, INTERVAL '5' MINUTE)")
.print();
streamTableEnvironment.executeSql(" select * from my_aggregated_view").print();
I get the below error:
Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) encountered.
at org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:49)
at org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:80)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420)
at org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178)
at org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211)
at org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
I am trying to add Tumble Window to on Table API and expecting to apply LISTAGG
on the data with in the Window.
The error
Window aggregate can only be defined over a time attribute column
means that you haven't defined a watermark strategy on the timestamp column being used for windowed aggregation.