apache-flink

The POJO class passes the test ,but shows invalid during execution


I define a POJO as follows:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class IdCount {
    private  Integer id;
    private  String name;
}

I test if it is a valid POJO with the following code:

public class Test {
    public static boolean isPojoClass(Class<?> pojoClass) {
        try {
            TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(pojoClass);
            return typeInfo instanceof PojoTypeInfo;
        } catch (Exception e) {
            return false;
        }
    }

    public static void main(String[] args) {
        boolean isPojo = isPojoClass(IdCount.class);

        if (isPojo) {
            System.out.println("This is a POJO class.");
        } else {
            System.out.println("This is not a POJO class.");
        }
    }
}

The printout result is as follows:

This is a POJO class.

My actual task is as follows:

public class TimeWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        tableEnv.executeSql("CREATE TABLE dataGen (\n" +
                "  id INT,\n" +
                "  name STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second'='1',\n" +
                "  'fields.id.kind'='random',\n" +
                "  'fields.id.min'='1',\n" +
                "  'fields.id.max'='10',\n" +
                "  'fields.name.length'='10'\n" +
                ")");

        Table table = tableEnv.sqlQuery("select * from dataGen");

        DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class);

        dataStream
                .countWindowAll(3)
                .max("id")
                .print()
        ;

        env.execute();
    }
}

The error message is as follows:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: 
Cannot reference field by field expression on 
*cn.chatdoge.flink117.POJO.IdCount<`id` INT, `name` STRING>*
(cn.chatdoge.flink117.POJO.IdCount, org.apache.flink.table.runtime.typeutils.ExternalSerializer)
Field expressions are only supported on POJO types, tuples, and case classes. 
(See the Flink documentation on what is considered a POJO.)
    at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:256)
    at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:78)
    at org.apache.flink.streaming.api.datastream.AllWindowedStream.max(AllWindowedStream.java:1366)
    at cn.chatdoge.flink117.window.TimeWindowExample.main(TimeWindowExample.java:55)

I don't understand why the test passed but I am getting an error during actual execution. Can you help me identify the issue? I use flink-1.17.2


Solution

  • I resolve by myself. A map function is needed.

    DataStream<IdCount> dataStream = tableEnv.toDataStream(table, IdCount.class).map(
        t -> new IdCount(t.getId(), t.getName())
    );