I define a POJO as follows:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IdCount {
private Integer id;
private String name;
}
I test if it is a valid POJO with the following code:
public class Test {
public static boolean isPojoClass(Class<?> pojoClass) {
try {
TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(pojoClass);
return typeInfo instanceof PojoTypeInfo;
} catch (Exception e) {
return false;
}
}
public static void main(String[] args) {
boolean isPojo = isPojoClass(IdCount.class);
if (isPojo) {
System.out.println("This is a POJO class.");
} else {
System.out.println("This is not a POJO class.");
}
}
}
The printout result is as follows:
This is a POJO class.
My actual task is as follows:
public class TimeWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
tableEnv.executeSql("CREATE TABLE dataGen (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.id.kind'='random',\n" +
" 'fields.id.min'='1',\n" +
" 'fields.id.max'='10',\n" +
" 'fields.name.length'='10'\n" +
")");
Table table = tableEnv.sqlQuery("select * from dataGen");
DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class);
dataStream
.countWindowAll(3)
.max("id")
.print()
;
env.execute();
}
}
The error message is as follows:
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
Cannot reference field by field expression on
*cn.chatdoge.flink117.POJO.IdCount<`id` INT, `name` STRING>*
(cn.chatdoge.flink117.POJO.IdCount, org.apache.flink.table.runtime.typeutils.ExternalSerializer)
Field expressions are only supported on POJO types, tuples, and case classes.
(See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:256)
at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:78)
at org.apache.flink.streaming.api.datastream.AllWindowedStream.max(AllWindowedStream.java:1366)
at cn.chatdoge.flink117.window.TimeWindowExample.main(TimeWindowExample.java:55)
I don't understand why the test passed but I am getting an error during actual execution. Can you help me identify the issue? I use flink-1.17.2
I resolve by myself. A map function is needed.
DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class).map(
t -> new IdCount(t.getId(), t.getName())
);