apache-stormapache-storm-topology

InvalidTopologyException(msg:Component: [x] subscribes from non-existent stream [y]


I m trying to read data from kafka and insert into cassandra using storm. I've configured the topology also, however I'm getting some issue and I don't have clue why that is happening.

Here is my submitter piece.

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
        topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
        topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt"); 

Here, if I comment the last line, I don't see any exceptions. With the last line, I'm getting the below error:

InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])

Can someone please help me, what is wrong here?

Here is the outputFieldDeclarer in CheckingBolt

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}

I don't have anything in declareOutputFields method for CassandraInsertBolt as that bolt doesn't emit any values.

TIA


Solution

  • The problem here is that you're mixing up stream names and component (i.e. spout/bolt) names. Component names are used for referring to different bolts, while stream names are used to refer to different streams coming out of the same bolt. For example, if you have a bolt named "evenOrOddBolt", it might emit two streams, an "even" stream and and "odd" stream. In many cases though, you only have one stream coming out of a bolt, which is why Storm has some convenience methods for using a default stream name.

    When you do .shuffleGrouping("checkingbolt"), you are using one of these convenience methods, effectively saying "I want this bolt to consume the default stream coming out of the checkingbolt". There is an overloaded version of this method you can use if you want to explicitly name the stream, but it's only useful if you have multiple streams coming out of the same bolt.

    When you do ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));, you are saying the bolt will emit on a stream named "cassandraBoltStream". This is probably not what you want to do, you want to declare that it will emit on the default stream. You do this by using the ofd.declare method instead.

    Refer to the documentation for more details.