javaapache-flinkflink-streamingpojo

Apache Flink type information error for POJO containing List of Tuples


[Edit 1: added the correct error log this time]

[Edit 2: reduced fields from class and the TypeInfoFactory]

I am unable to create a POJO type class that contains a data type that is a list of tuples. Perhaps I am not providing it's type information correctly?

[Edited] Here is the class and the TypeInfo:

@TypeInfo(NodeTestSerializer.class)
public class NodeTest implements Serializable {
    public List<Tuple3<Integer,Integer, String>> msgDestinationList = new ArrayList<>();

    public NodeTest() {
        msgDestinationList.add(Tuple3.of(1,2,"abc"));
    };
}

and I have defined its type information as follows:

public class NodeTestSerializer extends TypeInfoFactory<NodeTest> {
    @Override
    public TypeInformation<NodeTest> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {

        TypeInformation<Tuple3<Integer,Integer, String>> tupleTypeInfo =
                TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>() {});

        ImmutableMap nMap = ImmutableMap.builder()
                .put("msgDestinationList", Types.LIST(Types.TUPLE(tupleTypeInfo)))
                .build();

        final TypeInformation<NodeTest> nodeInfo = Types.POJO(NodeTest.class,nMap);
        return nodeInfo;
    }
}
    

However it gives the following error if I try to run a simple map operation on a DataStream that contains objects of the above class. It runs fine if I disable generics env.getConfig().disableGenericTypes(); so the problem is related to POJO serialization.

java.io.UncheckedIOException: java.io.IOException: Serializing the source elements failed: class java.lang.Integer cannot be cast to class org.apache.flink.api.java.tuple.Tuple (java.lang.Integer is in module java.base of loader 'bootstrap'; org.apache.flink.api.java.tuple.Tuple is in unnamed module of loader 'app')
    at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:158)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:145)
    at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:112)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:430)
    at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:398)
    at org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:342)
    at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
    at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
    at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
    at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:832)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562)
    at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2230)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2216)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2043)
    at test.NodeTestMain.main(NodeTestMain.java:39)
Caused by: java.io.IOException: Serializing the source elements failed: class java.lang.Integer cannot be cast to class org.apache.flink.api.java.tuple.Tuple (java.lang.Integer is in module java.base of loader 'bootstrap'; org.apache.flink.api.java.tuple.Tuple is in unnamed module of loader 'app')
    at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:135)
    at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:156)
    ... 22 more
Caused by: java.lang.ClassCastException: class java.lang.Integer cannot be cast to class org.apache.flink.api.java.tuple.Tuple (java.lang.Integer is in module java.base of loader 'bootstrap'; org.apache.flink.api.java.tuple.Tuple is in unnamed module of loader 'app')
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.serialize(ListSerializer.java:126)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.serialize(ListSerializer.java:42)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:365)
    at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:132)
    ... 23 more

I have tried other ways to provide the type information however I am all out of ideas on how to fix this error. Any help will be greatly appreciated!

NodeTestMain.java:

public class NodeTestMain {

    public static void main(final String[] args) throws IOException {
        try {

            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().disableGenericTypes();   // disable generic types (Kryo)

            DataStream<NodeTest> nodeStream = env.fromElements(new NodeTest());
            nodeStream.map(i -> i.msgDestinationList.get(0).f0).print();

            env.execute("Flink Node Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

}

Solution

  • Thanks to @kkrugler I was able to solve this by changing Types.LIST(Types.TUPLE(tupleTypeInfo))) with Types.LIST(tupleTypeInfo)) where tupleTypeInfo is defined in my question. Not sure why this works and not the former notation but the class is now recognized by the POJO serializer.