javaserializationapache-flinkkryo

Flink Stuck on Broadcast


I encountered a problem where Flink gets stuck during broadcast state writing on:

ctx.collect(data)

where ctx is SourceContext

there is no exception thrown and the the tread dump is stuck around, but is running and not stuck:

"Legacy Source Thread - Source: deviceInfoReader (1/1)#0" Id=86 RUNNABLE
    at org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:299)
    at org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:124)
    at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:42)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:473)
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:368)
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:195)
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:188)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:316)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
    at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.broadcastEmit(BroadcastRecordWriter.java:48)
    at org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter.emit(BroadcastRecordWriter.java:41)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107)
    -  locked java.lang.Object@11434500
    at xxx
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

Any idea of why it could be? Could it be a slow loop in the serializing process? The data size is big but never had any problems previously.

The task did not fail but just loops around this point.


Solution

  • I'm guessing you're running low on heap space, and thus are "hung" in GC hell.