
Give Flink type information to serialize accumulator object without Kryo

I have a custom AggregateFunction with signature like so:

public class CustomAggregateFunction
    implements AggregateFunction<CustomInput, AggregationAccumulator, CustomOutput> { code...}

My AggregationAccumulator is a simple object containing some maps, with the Lombok @Data annotation

  public static class AggregationAccumulator {
    private Map<String, Long> customMap = new HashMap<>();

However, Flink is saying that 13:18:50,091 INFO [] - Field AggregationAccumulator#customMap will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

How can I provide the type info so that it wouldn't use Kryo?


  • You can use add a @TypeInfo annotation to the map, and provide an appropriate TypeInfoFactory implementation. Something like this:

    import java.lang.reflect.Type;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.flink.api.common.functions.InvalidTypesException;
    import org.apache.flink.api.common.typeinfo.TypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    public class AccumulatorWithMap {
        private Map<String, Long> customMap = new HashMap<>();
        public AccumulatorWithMap() { }
        public Map<String, Long> getCustomMap() {
            return customMap;
        public void setCustomMap(Map<String, Long> customMap) {
            this.customMap = customMap;
        public static class MapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {
            public TypeInformation<Map<K, V>> createTypeInfo(
                    Type t, Map<String, TypeInformation<?>> genericParameters) {
                TypeInformation<K> keyType = (TypeInformation<K>)genericParameters.get("K");
                TypeInformation<V> valueType = (TypeInformation<V>)genericParameters.get("V");
                if (keyType == null) {
                    throw new InvalidTypesException(
                            "Type extraction is not possible on Map"
                                    + " type as it does not contain information about the 'key' type.");
                if (valueType == null) {
                    throw new InvalidTypesException(
                            "Type extraction is not possible on Map"
                                    + " type as it does not contain information about the 'value' type.");
                return new MapTypeInfo<K, V>(keyType, valueType);

    To verify, use:
