javaapache-kafkaapache-stormapache-storm-topology

Storm bolt can't deserialize object from spout


I'm making a spring application using Storm 1.1.2, JDK 8 (Storm didn't like JDK 9 compiled code), Kafka 0.11, and Docker Compose.

The idea is to have a containerized service that can receive REST calls to create storm topologies and then submit them to a storm cluster. Everything worked locally but moving the topology submission from submitting to a local cluster to StormSubmitter is causing issues. I have most of them resolved but there is a weird serialization issue.

I have a spout that reads from Kafka successfully. It reads byte arrays of Protobuf objects and uses a custom de-serializer to create the Messages out of them. I have two different bolts reading from this spout, one that prints the incoming messages (bolt A) and one that filters the messages based on a field and emits them to another bolt to be aggregated (bolt B).

The only differences I notice between the two bolts is that bolt B has a constructor and bolt A does not.

For some reason bolt A has no trouble receiving the messages from the spout and printing them but everytime a message comes to bolt B it throws an exception com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): my.package.MyProtobufMessage. I see that you can register serializers for classes but why would bolt A be able to process the message when bolt B cannot?

Also-separate issue but when I add a third topology the nimbus doesn't assign a supervisor to it. One topology will be up with 2 works and 9 execs, a second topology will be up with 2 workers and 6 execs, and then I'll add a third topology that will show up in the UI and Nimbus logs but not supervisor logs. In the UI the third topology will have 0 workers, execs, and 0 assigned mem


Solution

  • You might be "lucky" in that the spout and bolt A are in the same worker, while bolt B is in another worker. Storm doesn't serialize tuples unless they are transferred to another worker, which may be why bolt A can read the message.

    Regarding the third topology question, you need to make sure your supervisors have enough worker slots for your topologies. Each supervisor defines in the Storm configuration (storm.yaml) how many worker JVMs it's willing to run. I'm guessing the first two topologies are occupying all the slots.

    The default configuration for the supervisors is

    supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703
    

    which allows 4 worker JVMs on each supervisor. Since worker JVMs are not shared, if you're running 2 topologies taking up 2 workers each, you've used up all the slots. You can either add more slots, or more supervisor machines, or reduce the number of workers required by your topologies.