jsongroup-byapache-kafkaapache-kafka-streamsktable

Kafka Streams API GroupBy behaviour


I am new in kafka streams and I am trying to aggregate some streaming data into a KTable using groupBy function. The problem is the following:

The produced message is a json msg with the following format:

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

I want to isolate the json field "after" and then create a KTable with "key" = "ID" and value the whole json "after".

Firstly, I created a KStream to isolate the "after" json, and it works fine.

KStream code block: (Don't pay attention to the if statement because "before" and "after" have the same format.)

KStream<String, String> s_order_list = s_order
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                });

The output, as expected, is the following:

...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...

Afterwards, I implement a KTable to groupBy the "ID" of the json.

KTable code block:

  KTable<String, String> s_table = s_order_list
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getString("ID");
                });

And there is an error that I want to create KTable<String, String> but I am creating GroupedStream<Object,String>.

Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>

In conclusion, the question is what exactly are KGroupedStreams and how to implement a KTable properly ?


Solution

  • After groupBy processor, you can use a stateful processor, like aggregate or reduce (that processors returns KTable). You can do something like this:

    KGroupedStream<String, String> s_table = s_order_list
                         .groupBy((key, value) ->
                             new JSONObject(value).getString("ID"),
                             Grouped.with(
                                     Serdes.String(),
                                     Serdes.String())
                         );
    
    KTable<String, StringAggregate> aggregateStrings = s_table.aggregate(
                         (StringAggregate::new),
                         (key, value, aggregate) -> aggregate.addElement(value));
    

    StringAggregate looks like:

    public class StringAggregate {
    
        private List<String> elements = new ArrayList<>();
    
        public StringAggregate addElement(String element){
            elements.add(element);
            return this;
        }
        //other methods
    }