spring-bootapache-kafkaapache-kafka-streams

access to window start time in aggregation function


I have a kafkaStream topology shown in:

stream
                        .filter(((key, Trade) -> Trade.tradeTime != null && Trade.tradeTime > todayMillis ))
                        .groupByKey(
                                Grouped.with(Serdes.String(),JSONSerdes.Trade())
                        )
                        .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(Convertor.getCandleByResolution(resolution)), Duration.ofDays(1)))
                        .aggregate(
                                OHLC::new,
                                (( key, value, aggregate) -> aggregate.add(value,key)),
                                Materialized.<String, OHLC, WindowStore<Bytes, byte[]>>
                                                as(stateStoreName)
                                        .withKeySerde(Serdes.String())
                                        .withValueSerde(JSONSerdes.OHLC())
                        );

how to I want access to window start time in aggregation function? for example I want pass window start time to add function but key is string. my (key, value) pair is ("IFTTT",OHLC object )


Solution

  • You cannot. It's not exposed. In general there is no need for this information inside the aggregator code.

    Feel free to file a Jira ticket with a feature request: https://issues.apache.org/jira/browse/KAFKA