I am new in kafka streams and I am trying to aggregate some streaming data into a KTable using groupBy function. The problem is the following:
The produced message is a json msg with the following format:
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
I want to isolate the json field "after" and then create a KTable with "key" = "ID" and value the whole json "after".
Firstly, I created a KStream to isolate the "after" json, and it works fine.
KStream code block: (Don't pay attention to the if statement because "before" and "after" have the same format.)
KStream<String, String> s_order_list = s_order
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
});
The output, as expected, is the following:
...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...
Afterwards, I implement a KTable to groupBy the "ID" of the json.
KTable code block:
KTable<String, String> s_table = s_order_list
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getString("ID");
});
And there is an error that I want to create KTable<String, String>
but I am creating GroupedStream<Object,String>
.
Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>
In conclusion, the question is what exactly are KGroupedStreams and how to implement a KTable properly ?
After groupBy
processor, you can use a stateful processor, like aggregate
or reduce
(that processors returns KTable
). You can do something like this:
KGroupedStream<String, String> s_table = s_order_list
.groupBy((key, value) ->
new JSONObject(value).getString("ID"),
Grouped.with(
Serdes.String(),
Serdes.String())
);
KTable<String, StringAggregate> aggregateStrings = s_table.aggregate(
(StringAggregate::new),
(key, value, aggregate) -> aggregate.addElement(value));
StringAggregate looks like:
public class StringAggregate {
private List<String> elements = new ArrayList<>();
public StringAggregate addElement(String element){
elements.add(element);
return this;
}
//other methods
}