apache-flinkflink-streamingapache-hudi

How to provide TypeInformation for a GenericRowData type Objects in Flink


I am using a deserializer to parse a Kafka Stream (of JSON Strings) and I'm then using the GeneericRowData class to convert the Object node to type RowData instance, which is supported by hudi to write directly from DataStream. I'm expected to provide the TypeInformation, so that Flink doesn't fall back to generic serialization but I'm unable to find a way, as of how to provide a TypeInformation of that matches with the GenericRowData type. I tried using TypeHint but that doesn't work. Here's a sample code for what I'm trying to achieve.

        public class KafkaDeserializer implements DeserializationSchema<RowData> {
    
        private transient ObjectMapper mapper;
    
        @Override
        public void open(InitializationContext context) {
            mapper = JacksonMapperFactory.createObjectMapper();
        }
    
        @Override
        public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
            DeserializationSchema.super.deserialize(message, out);
        }
    
    
         public RowData deserialize(byte[] message) throws IOException {
    
    
         JsonNode messagePayload = mapper.readTree(message);
    
    
         GenericRowData genericRowData = GenericRowData.of(
                        StringData.fromString(messagePayload.get("x")),
                        StringData.fromString(messagePayload.get("Y")),
                        StringData.fromString(messagePayload.get("Z"))
                        ...........
                        );
         }

         public TypeInformation<RowData> getProducedType() {

          return TypeInformation.of(new TypeHint<>() {})
     }
 }

Need some guidance on how can I implement the above function getProducedType().


Solution

  • Just providing TypeInformation isn't enough to for Flink to pick a "good" serializer. The TypeInformation has to return a serializer other than Kryo for GenericRowData, and I'm pretty sure that won't just work out of the box.

    You could implement your own GenericRowDataTypeInfo class that extends TypeInformation<GenericRowData>, and returns Flink's RowDataSerializer from the createSerializer() method.