apache-kafkacollectdksqldb

ksql - creating a stream from a json array


My kafka topic is pushing data in this format (coming from collectd):

[{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]

It's a combination of arrays, ints and floats... and the whole thing is inside a json array. As a result Im having a heck of a time using ksql to do anything with this data.

When I create a 'default' stream as

create stream cd_temp with (kafka_topic='ctd_test', value_format='json');

I get this result:

ksql> describe cd_temp;

 Field   | Type                      
-------------------------------------
 ROWTIME | BIGINT           (system) 
 ROWKEY  | VARCHAR(STRING)  (system) 
-------------------------------------

Any select will return the ROWTIME and an 8 digit hex value for ROWKEY.

I've spent some time trying to extract the json fields to no avail. What concerns me is this:

ksql> print 'ctd_test' from beginning;
Format:JSON
com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode

Is it possible that this topic can't be used in ksql? Is there a technique for unpacking the outer array to get to the interesting bits inside?


Solution

  • At the time of writing, (June 2018), KSQL can't handle a JSON message where the whole thing is embedded inside a top level array. There is a github issue to track this. I'd suggest adding a +1 vote on this issue to up the priority of it.

    Also, I notice that your create stream statement is not defining the schema of the json message. While this won't help in this situation, it is something that you'll need for other Json input formats, i.e. you create statement should be something like:

    create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');