I am using flink 1.12.0. Trying to convert a data stream into a table A and running the sql query on the tableA to aggregate over a window as below.I am using f2 column as its a timestamp data type field .
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
props.setProperty("schema.registry.url", xxx);
props.setProperty("group.id", "test");
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("client.id", "flink-kafka-example");
FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
"test-topic",
ConfluentRegistryAvroDeserializationSchema.forSpecific(
Avrotest.class, prodSchemaRegistryURL),
props);
DataStreamSource<Avrotest> stream =
env.addSource(kafkaConsumer);
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
Table result =
tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
+ tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );
tEnv.toAppendStream(result,user.class).print();
env.execute("Flink kafka test");
}
When i execute above code,i get
Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(6) encountered. at org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) at org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:81) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
In order to do using the table API to perform event-time windowing on your datastream, you'll need to first assign timestamps and watermarks. You should do this before calling fromDataStream
.
With Kafka, it's generally best to call assignTimestampsAndWatermarks
directly on the FlinkKafkaConsumer
. See the watermark docs, kafka connector docs, and Flink SQL docs for more info.