apache-flinkrocksdb

flink with rocksdb failed when doing aggregation


i hava a job that does aggregation based on a model, for example, it contains a Map<MetricDim, Long> field. but after the aggregation's starting, the job changed from 'RUNING' to 'FAILING'


${time} INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job $JobName ($jobId) switched from state RUNNING to FAILING.
java.lang.ClassCastException: com.aggregation.MetricDim cannot be cast to java.lang.String
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:84)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:274)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:357)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.getValueBytes(AbstractRocksDBState.java:184)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:79)
        at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:103)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:750)

same code can be ran in non rocksdb statebackend, such as FsStateBackend. but when in rocksdb, it always fails. my flink version is flink v1.7.1.

the job was not ran with savepoint.

i don't know how to resolve this.

could anyone help?


Solution

  • defining a new custom MapSerializer and configuring it to job solved it