What I am doing wrong ?
I am writing a ProcessorSupplier for aggregate n records into one. For that I am using List Serdes ...
My problem is that the ArrayList is always empty.
Using Java 21 and Kafka streams 3.7.0
public class KafkaTheBatcherProcessorApiApplication {
public static void main(String[] args) {
final Topology topology = new Topology();
topology.addSource( "source-node", stringSerde.deserializer(), stringSerde.deserializer(), "inputTopic");
topology.addProcessor("aggregate-records",
new BatchProcessorSupplierPersistedStore(),
"source-node");
topology.addSink( "sink-node", "outputTopic", stringSerde.serializer(), listSerde.serializer(), "aggregate-records");
Properties properties = new Properties();
try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
kafkaStreams.start();
}
}
then my Supplier
class BatchProcessorSupplierPersistedStore implements ProcessorSupplier<String, String, String, List<String>> {
@Override
public Set<StoreBuilder<?>> stores() {
return Set.of(
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String()))
);
}
@Override
public Processor<String, String, String, List<String>> get() {
return new Processor<>() {
private ProcessorContext<String, List<String>> context;
private KeyValueStore<String, List<String>> storeList;
@Override
public void init(ProcessorContext<String, List<String>> context) {
this.context = context;
storeList = context.getStateStore("batch-store");
this.context.schedule(Duration.ofSeconds(60), PunctuationType.STREAM_TIME, this::forwardAll);
}
private void forwardAll(final long timestamp) {
storeList.all().forEachRemaining(entry -> {
if (!entry.value.isEmpty()) {
context.forward(...
});
}
@Override
public void process(Record<String, String> record) {
if (storeList.get(record.key()) == null) {
storeList.put(record.key(), new ArrayList<>(10000));
}
storeList.get(record.key()).add(record.value());
}
};
}
}
I added a Store counter:
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("batch-store-counter"), Serdes.String(), Serdes.Integer())
and this one is worked as expected.
Your initialized list is put into the state store, but you never put the updated list into the state store