apache-flinkflink-streamingflink-sqlflink-cepflink-table-api

Issue while adding Tumble Window/Watermark with TIMESTAMP AS event_time to view


Need help on fixing the below errors while adding Window on Table API.

Env is with flinkVersion = "1.20.0"

I trying to add Tumble Window to View, expectation is to apply LISTAGG() function over the bunch of events received during the Window time.

  1. App receives Data from source in JSON format where timestamp is a string.
  2. Convert JSON Data to DataStream, while converting we defined the TypeInformation if attribute is of String then Types.STRING and if attribute is related to timestamp string given TypeConversions.fromDataTypeToLegacyInfo(DataTypes.TIMESTAMP_LTZ(3))
  3. Store the DataStream to Table API to perform SQL-Join operations. 4)While defining the schema for Table API- we have given attribute with String Timestamp as DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)(also tried with DataTypes.TIMESTAMP_LTZ(3))
  4. The issue with this while streaming Flink is trying to convert java.timestamp to Instant and saying i can't cast to Instant.

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.time.Instant (java.time.LocalDateTime and java.time.Instant are in module java.base of loader 'bootstrap') at org.apache.flink.api.common.typeutils.base.InstantSerializer.copy(InstantSerializer.java:31) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyNameBased(RowSerializer.java:177)

Like to know what is the right org.apache.flink.table.api.DataTypes to be used to store the String as Timestamp in Table API that can be later used for Window operations.

I also tried by creating my_view with CURRENT_TIMESTAMP AS event_time as below and later adding TUMBLE on the my_view

++++++++++++++++++++++++++++++++++++++++++++++++++++ streamTableEnvironment .executeSql( " CREATE VIEW my_view AS SELECT ACFT_NO_ID as ID, LEVEL_CD as Name, CURRENT_TIMESTAMP AS event_time FROM acCapLevelsMappedView") // + " WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND FROM acCapLevelsMappedView") .print();

    streamTableEnvironment.executeSql(" desc my_view").print();

    streamTableEnvironment
            .executeSql(
                    " CREATE VIEW my_aggregated_view AS SELECT ID,  LISTAGG( "
                            + " CONCAT_WS( "
                            + " CHR(31), "
                            + " Name "
                            + "            ), '\\u001E' ) , TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,"
                            + " TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end FROM my_view GROUP BY ID, TUMBLE(event_time, INTERVAL '5' MINUTE)")
            .print();
    streamTableEnvironment.executeSql(" select * from my_aggregated_view").print();

++++++++++++++++++++++++++++++++++++++++++++++++++++

I see below error Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) encountered. at org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:49) at org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:80) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420) at org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178) at org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211) at org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)

I am trying to add Tumble Window to on Table API and expecting to apply LISTAGG on the data with in the Window.


Solution

  • The error

    Window aggregate can only be defined over a time attribute column
    

    means that you haven't defined a watermark strategy on the timestamp column being used for windowed aggregation.