I have a custom AggregateFunction with signature like so:
public class CustomAggregateFunction
implements AggregateFunction<CustomInput, AggregationAccumulator, CustomOutput> { code...}
My AggregationAccumulator
is a simple object containing some maps, with the Lombok @Data annotation
@Data
public static class AggregationAccumulator {
private Map<String, Long> customMap = new HashMap<>();
}
However, Flink is saying that 13:18:50,091 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field AggregationAccumulator#customMap will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
How can I provide the type info so that it wouldn't use Kryo?
You can use add a @TypeInfo
annotation to the map, and provide an appropriate TypeInfoFactory
implementation. Something like this:
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
public class AccumulatorWithMap {
@TypeInfo(MapTypeInfoFactory.class)
private Map<String, Long> customMap = new HashMap<>();
public AccumulatorWithMap() { }
public Map<String, Long> getCustomMap() {
return customMap;
}
public void setCustomMap(Map<String, Long> customMap) {
this.customMap = customMap;
}
public static class MapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Map<K, V>> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
TypeInformation<K> keyType = (TypeInformation<K>)genericParameters.get("K");
TypeInformation<V> valueType = (TypeInformation<V>)genericParameters.get("V");
if (keyType == null) {
throw new InvalidTypesException(
"Type extraction is not possible on Map"
+ " type as it does not contain information about the 'key' type.");
}
if (valueType == null) {
throw new InvalidTypesException(
"Type extraction is not possible on Map"
+ " type as it does not contain information about the 'value' type.");
}
return new MapTypeInfo<K, V>(keyType, valueType);
}
}
}
To verify, use:
PojoTestUtils.assertSerializedAsPojoWithoutKryo(AccumulatorWithMap.class);