apache-stormapache-storm-topology

Is there any Java API to know when topology is ready for reading first message from Spout


Our Apache Storm topology listens messages from Kafka using KafkaSpout and after doing lot of mapping/reducing/enrichment/aggregation etc. etc finally inserts data into Cassandra. There is another kafka input where we receive user queries for data if topology finds a response then it sends that onto a third kafka topic. Now we want to write E2E test using Junit in which we can directly programmatically insert data into topology and then by inserting user query message, we can assert on third point that response received on our query is correct.

To achieve this, we thought of starting EmbeddedKafka and CassandraUnit and then replacing actual Kafka and Cassandra with them and then we can start topology in the context of this single Junit test.

Before, we start our actual test, we create topology and submit it into LocalCluster. It starts topology on a different thread and comes out from Before and starts executing our test. Till that time, topology is not ready because it takes some time to be ready for processing. Is there any java API which can tell us when topology is ready for processing (means ready to read first message from Spout)?


Solution

  • This depends on what you mean when you say "ready for processing".

    If you enable time simulation for your LocalCluster, you can use Time.advanceClusterTime to advance time in steps. If you call this method after submitting a topology, it will only return once the cluster is mostly idling. See e.g. https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.

    If you're willing to replace your spouts with stubs (e.g. FixedTupleSpout), you can use Testing.completeTopologyto wait until the topology has finished processing all the tuples you set up the stub to emit.

    Another method to wait for the topology to have processed some tuples would be that you put some messages in Kafka, start your topology, and then have your testing thread poll Cassandra to see if the messages you expect have made it through. This way, you can set a timeout in your testing thread, and have the test fail if the condition is not met in some number of seconds. You could use a utility like Awaitility for this https://github.com/awaitility/awaitility, or just write your own polling logic.

    If you mean something else by "ready for processing", please describe in more detail what you mean.