I have the following topology in Kafka Streams 7.2.2-ccs:
Or in code:
val groupedStream = StreamsBuilder().stream<String, Quote>("quotes").groupByKey()
for (windowSize in windows()) {
groupedStream
.windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize))
.aggregate({ Aggregator() },{ _, quote, aggregator -> aggregator.execute(quote) })
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("outputTopic")
}
I am using io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
to monitor the application. I have some questions:
Suppress is not based on RocksDB, but uses an in-memory store. There should be something like in-memory-suppression
for the available metrics (cf https://docs.confluent.io/platform/current/streams/monitoring.html#state-store-metrics)
You should get 3 segments (ie, 3 RocksDB) times the number of partitions, ie, 9 RocksDBs in your case with 3 input topic partitions.
No, you cannot control the flushing. However, you can limit RocksDB memory usage via RocksDBConfigSetter
that you can pass via StreamsConfig
.