javaapache-kafkaapache-kafka-streamsktable

Apache Kafka - Implementing a KTable


I am new to Kafka Streams API and I am trying to create a KTable. I have an input topic: s-order-topic, which is a json format message, as shown below.

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

I read messages from this topic and I want to create a KTable that has as key, the field "after":"ID" and for value all the fields inside the "after" field (except for "ID").

I have successfully created a KTable only when I use the default aggregate functions i.e count. But I have difficulty creating my own aggregate function. Below I present the part of the code that I try to create the KTable.

KTable<Long, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.Long(),Serdes.String()))
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                })
               .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getLong("ID");
                }, Grouped.with(Serdes.Long(), Serdes.String()))
                .aggregate( ... );

How can I implement this KTable?

Am I approaching the problem correctly?

(mapValues -> keep only the "before"/"after" field. groupBy -> Make the ID the key of the message. Aggregate -> ? )


Solution

  • I figured out a solution for my case. I implemented the KTable as shown below:

     KTable<String, String> s_table = builder.stream("s-order-topic",  Consumed.with(Serdes.String(),Serdes.String()))
                    .mapValues(value -> {
                        String time;
                        JSONObject json = new JSONObject(value);
                        if (json.getString("op_type").equals("I")) {
                            time = "after";
                        }else {
                            time = "before";
                        }
                        JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                        return json2.toString();
                    })
                    .groupBy((key, value) -> {
                        JSONObject json = new JSONObject(value);
                        return String.valueOf(json.getLong("ID"));
                    }, Grouped.with(Serdes.String(), Serdes.String()))
                    .reduce((prev,newval)->newval);
    

    The aggregate function is not suitable for this case, instead I used the reduce function.

    The output from the console consumer is shown below:

    15   {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
    18   {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
    4    {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
    19   {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
    18   {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
    5    {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
    10   {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
    3    {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
    9    {"CODE":"AAAA89","STATUS":"PENDING","ID":9}