I encountered a problem where Flink gets stuck during broadcast state writing on:
ctx.collect(data)
where ctx is SourceContext
there is no exception thrown and the the tread dump is stuck around, but is running and not stuck:
"Legacy Source Thread - Source: deviceInfoReader (1/1)#0" Id=86 RUNNABLE
at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:299)
at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:124)
at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:42)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:473)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:368)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:195)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:188)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.broadcastEmit(BroadcastRecordWriter.java:48)
at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.emit(BroadcastRecordWriter.java:41)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
- locked java.lang.Object@11434500
at xxx
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Any idea of why it could be? Could it be a slow loop in the serializing process? The data size is big but never had any problems previously.
The task did not fail but just loops around this point.
I'm guessing you're running low on heap space, and thus are "hung" in GC hell.