kryoapache-flink

Using an Collections$UnmodifiableCollection with Apache Flink


While using Apache Flink with the following code:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();

I got this exception

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

How can I ue the UnmodifiableCollection with Flink?


Solution

  • The problem is that the default CollectionSerializer of Kryo can not deserialize the collection again, because its not modifiable (the .add() call fails).

    To resolve the issue, we can use the UnmodifiableCollectionsSerializer from the kryo-serializers project. Flink transitively depends on the project, so there is no need to add it as a dependency.

    Next, we have to register the serializer with Flink's Kryo instances.

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
    see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
    

    Usually, we don't have to call Class.forName() for registering a serializer, but in this case, java.util.Collections$UnmodifiableCollection is package visible, so we can not directly access the class.