The current Hazelcast Jet 0.6.1 code sample demonstrates aggregation based on single field (e.g. ticker).
Here is a reference.
\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java
How this can be extended for more than one like ticker, traderId etc.
Here is current sample code from the StockExchange.java
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
For ticker & traderId you can use:
.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))
Generally, the key can by anything that implements equals
and hashCode
properly. Tuple2
is a generic container for two values.