I have an Kafka streams application in which I read from a topic, do aggregation and materialize in a KTable. I then create a Stream and run some logic on the stream. Now in the stream processing, I want to use some data from the aforementioned KTable. Once I start the stream app, how do I get access to the KTable stream again? I don't want to push the KTable to a new Topic.
KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
source.groupBy((key, value) -> value.getKey(),
Grouped.<String, MyClass >as("repartition-1")
.withKeySerde(new Serdes.String())
.withValueSerde(new MyClassSerDes()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
.withKeySerde(new Serdes.String())
.withValueSerde(Serdes.Long()));
Here I want to use data from the kTable.
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30)))
.toStream()
.filter((k, v) -> {
// Here get the count for the previous Window.
// Use that count for some computation here.
}
You can add the KTable
store to a processor/transformer. For you case, you can replace the filter
with flatTransform
(or any sibling like transform
etc depending if you need access to the key) and connect the store to the operator:
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30))
)
.toStream()
// requires v2.2; otherwise use `transform()`
// if you don't need access to the key, consider to use `flatTransformValues` (v2.3)
.flatTransform(
() -> new Transformer<Windowed<myKey>,
Long,
List<KeyValue<Windowed<myKey>, Long>>() {
private ReadOnlyWindowStore<myKey, Long> store;
public void init(final ProcessorContext context) {
// get a handle on the store by its name
// as specified via `Materialized` above;
// should be read-only
store = (ReadOnlyWindowStore<myKey, Long>)context.getStateStore("str");
}
public List<KeyValue<Windowed<myKey>, Long>> transform(Windowed<myKey> key,
Long value) {
// access `store` as you wish to make a filtering decision
if ( ... ) {
// record passes
return Collection.singletonList(KeyValue.pair(key, value));
} else {
// drop record
return Collection.emptyList();
}
}
public void close() {} // nothing to do
},
"str" // connect the KTable store to the transformer using its name
// as specified via `Materialized` above
);