hazelcast-jet

Hazelcast Jet 0.6.1- Aggregation on multiple fields


The current Hazelcast Jet 0.6.1 code sample demonstrates aggregation based on single field (e.g. ticker).

Here is a reference.

\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java

How this can be extended for more than one like ticker, traderId etc.

Here is current sample code from the StockExchange.java

 private static Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
            alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
     .addTimestamps(Trade::getTime, 3000)
     .groupingKey(Trade::getTicker)
     .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
     .aggregate(counting(),
             (winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
     .drainTo(Sinks.logger());

    return p;
}

Solution

  • For ticker & traderId you can use:

    .groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))
    

    Generally, the key can by anything that implements equals and hashCode properly. Tuple2 is a generic container for two values.