serializationapache-stormkryostormcrawler

java.util.ConcurrentModificationException when adding some key to metadata in stormcrawler


I have added a field to metadata for transferring and persisting in the status index. The field is a List of String and its name is input_keywords. After running topology in the Strom cluster, The topology halted with the following logs:

java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4132.invoke(disruptor.clj:84) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_112]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
2021-02-27 08:03:34.276 o.a.s.d.executor Thread-20-disruptor-executor[45 45]-send-queue [ERROR] 
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$consume_loop_STAR_$fn__4132.invoke(disruptor.clj:84) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
Caused by: java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_112]
    at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_112]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    ... 6 more
2021-02-27 08:03:34.327 o.a.s.util Thread-20-disruptor-executor[45 45]-send-queue [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn_10827$fn_10828.invoke(worker.clj:781) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.daemon.executor$mk_executor_data$fn_10034$fn_10035.invoke(executor.clj:281) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]

We have different parallelism hints for each components of topology. After adding the input_keywords to metadata we got the error. What is the main reason of the Error?


Solution

  • You are modifying a Metadata instance while it is being serialized. You can't do that, see Storm troubleshooting page.

    As explained in the release notes of 1.16, you can lock the metadata. This won't fix the issue but will tell you where in your code you are writing into the metadata.

    In our topology, we emitted the same metadata to multiple bolts at the same time. 
    

    mystery explained. don't do that.