javaapache-flinkflink-streaming

Give Flink type information to serialize accumulator object without Kryo


I have a custom AggregateFunction with signature like so:

public class CustomAggregateFunction
    implements AggregateFunction<CustomInput, AggregationAccumulator, CustomOutput> { code...}

My AggregationAccumulator is a simple object containing some maps, with the Lombok @Data annotation

  @Data
  public static class AggregationAccumulator {
    private Map<String, Long> customMap = new HashMap<>();
  }

However, Flink is saying that 13:18:50,091 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field AggregationAccumulator#customMap will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

How can I provide the type info so that it wouldn't use Kryo?


Solution

  • You can use add a @TypeInfo annotation to the map, and provide an appropriate TypeInfoFactory implementation. Something like this:

    import java.lang.reflect.Type;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.flink.api.common.functions.InvalidTypesException;
    import org.apache.flink.api.common.typeinfo.TypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.MapTypeInfo;
    
    public class AccumulatorWithMap {
    
        @TypeInfo(MapTypeInfoFactory.class)
        private Map<String, Long> customMap = new HashMap<>();
        
        public AccumulatorWithMap() { }
    
        public Map<String, Long> getCustomMap() {
            return customMap;
        }
    
        public void setCustomMap(Map<String, Long> customMap) {
            this.customMap = customMap;
        }
        
        public static class MapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {
    
            @SuppressWarnings("unchecked")
            @Override
            public TypeInformation<Map<K, V>> createTypeInfo(
                    Type t, Map<String, TypeInformation<?>> genericParameters) {
                TypeInformation<K> keyType = (TypeInformation<K>)genericParameters.get("K");
                TypeInformation<V> valueType = (TypeInformation<V>)genericParameters.get("V");
    
                if (keyType == null) {
                    throw new InvalidTypesException(
                            "Type extraction is not possible on Map"
                                    + " type as it does not contain information about the 'key' type.");
                }
    
                if (valueType == null) {
                    throw new InvalidTypesException(
                            "Type extraction is not possible on Map"
                                    + " type as it does not contain information about the 'value' type.");
                }
                
                return new MapTypeInfo<K, V>(keyType, valueType);
            }
        }
    
        
    }
    

    To verify, use:

    PojoTestUtils.assertSerializedAsPojoWithoutKryo(AccumulatorWithMap.class);