I read a json from Kafka, FieldExtractionBolt reads that json extracts data into tuple values and passes them to CassandraWriterBolt, which in its turn writes a record in Cassandra writing all those tuple values into separate columns.
JSON message on Kafka -
{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}
FieldExtractionBolt -
String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));
CassandraWriterBolt -
return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))
I tried writing a test based on the answer given here - How to E2E test functionality of Storm Topology by programmatically inserting messages
In my project, I define all my bolts, spouts and streams in Spring config. This makes writing/reading my topology very easy. I build topology by getting bolt, spouts and stream beans from ApplicationContext. In my Spring config, KafkaSpout and CassandraWriterBolt are defined under 'prod' profile so that they only be used in prod and under 'test' profile I define stubs for KafkaSpout and CassandraWriterBolt. For KafkaSpout, I used FixedToupleSpout and for CassandraWriterBolt I used TestWordCounter.
This is my test
@Test
public void testTopology(){
StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
MockedSources mocked = new MockedSources();
mocked.addMockData("kafkaSpout",
new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));
Config topoConf = new Config();
topoConf.setNumWorkers(2);
CompleteTopologyParam ctp = new CompleteTopologyParam();
ctp.setMockedSources(mocked);
ctp.setStormConf(topoConf);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
@Bean
public IRichSpout kafkaSpout(){
return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
}
@Bean
public IBasicBolt cassandraWriterBolt(){
return new TestWordCounter();
}
}
Result I am getting is not what I am expecting. I am getting following error -
java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]
Looks like, TestWordCounter is just reading first value as a tuple (currency pair only and skipping bid and ask). Seems TestWordCounter is not a right choice here. What would be correct stub for CassandraWriterBolt so that I can assert that it would receive 2 records one for GBPJPY and another for GBPUSD with their bid and ask price as well?
Testing.readTuples(results, "cassandraWriterBolt")
will return the tuples emitted by "cassandraWriterBolt". Is that what you're trying to test? I think you are trying to assert about which tuples "cassandraWriterBolt" receives, not what it emits.
You can do two things here. You can use readTuples
to read from the bolts that are emitting to the Cassandra bolt, instead of reading from the Cassandra bolt. This is a decent solution if your topology is simple (e.g. not many different bolts writing to the Cassandra bolt).
A better solution (IMO) is to write a simple stub bolt to replace TestWordCounter
. The only thing the bolt should do is receive the input tuple, ack it, and emit the values in a new tuple.
execute(Tuple input, BasicOutputCollector collector) {
collector.emit(input.getValues());
}
Then you can use readTuples
to read the tuples that bolt emits, which will be the same values it receives.