I have read about the Apache storm and did some basic tutorials. I have following topology in mind that I would like to implement with storm, but not sure how to handle the data distribution. Business requirement is to evaluate customers portfolio in realtime. In simplified form it involves: 1) Accept live steam of market prices (currencies, commodities, etc...) 2) For every price tick calculate current profit of every position and convert it to customer account currency 3) Analyze total p/l and volume of all positions per customer and generate signals if required 4) At customer level calculation must be sequential and atomic/serialized. I.e. all positions must be evaluated with every tick in the order it entered the system and totals must be calculated based on the same price even if customer has 100s of positions. 5) Analyze volumes / trends of all positions in system aggregated by symbol/customer type/country /etc... and make them available in some kind of a dashboard.
All orders are executed and stored in rdbms. My major question is how to distribute 100s of thousands of positions across Storm bolts on different nodes that every node handles it's own part. Using Modulo is good enough for partitioning the customers, but how can I provide id to every instance of bolt so each of them handles it's own equal part of customers only? Is there something out of the box in Storm to do that? Another question is how to do above aggregations efficiently?
you can use fieldsGrouping
for that. you can declare a field by which tuples are grouped(in your case, id
).
I'll just suppose that your input stream is JSON object with id and body field like
{"id":"1234","body":"some body"}
Also suppose your topology has one spout, two bolts namely BoltA and BoltB.
In BoltB, override declareOutputFields method and fill in the detail.
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","log"));
}
And you can declare topology like below
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
.shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));
In this case, tuples with same id from boltA
will be delivered to same instance of boltB