I'm trying to create a few Unit Tests to verify that certain parts of my Trident topology are doing what they are supposed to.
I'd like to be able to retrieve all the values resulting after running the topology and put them in a List so I can "see" them and check conditions on them.
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
I am running the topology as a TrackedTopology
(got the code from another S.O. question, thank you @brianghig for asking it and @Thomas Kielbus for the reply)
This is how I "launch" the topology and how I feed sample values into it:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
When I do this, I can see in the log messages that the topology is running correctly, and that the values are calculated properly, but I'd like to "fish" the results out into a List
(or any structure, at this point) so I can actually put some Asserts
in my tests.
I've been trying [a s**ton] of different approaches, but none of them work.
The latest idea was adding a bolt after the aggregation so it would "persist" my values into a list:
Below you'll see the class that tries to go through all the tuples emitted by the aggregate
and would put them in a list that I had previously initialized:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
this.results = results;
}
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector) {
for (TridentTuple tuple : tuples) {
results.add((AggregatedFoo) tuple.getValue(0));
}
}
}
So now the code would look like:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results={}", results);
But nothing... The logs show Done. Checkpoint results=[]
(empty list)
Is there a way to get that? I imagine it must be doable, but I haven't been able to figure out a way...
Any hint or link to pages or anything of the like will be appreciated. Thank you in advance.
You need to use a static member variable result
. If you have multiple parallel tasks running (ie, parallelism_hint > 1
) you also need to synchronize
the write access to result
.
In your case, result
will be empty, because Storm internally, creates a new instance of your bolt (including a new instance of ArrayList
). Using a static variable ensures, that you get access to the correct object (as there will be only one over all instances of your bolt).