wso2siddhiwso2-streaming-integrator

How to parse array-of-arrays with Siddhi


I'm trying to parse a JSON event stream using WSO2 Streaming Integrator / Siddhi but seem unsuccessful in getting the events properly into the stream. I'm using dockerimage wso2/streaming-integrator:1.0.0 to run this on.

I'm sending the following input stream into the SiddhiApp (as an HTTP POST into the Siddhi endpoint):

[
    [ 1502942400000, "4261.480" ],
    [ 1503982400000, "1010.312" ]
]

My stream definition is as follows:

@source(type='http', receiver.url='http://0.0.0.0:5010/stream', @map(type='json', @attributes(
        timestamp = "[0]", sensorvalue = "[1]"
        )))
define stream SensorAlertsStream (
    timestamp long,
    sensorvalue double,

I'm using postman to send the data into WSO2 Streaming Integrator but all I get is errors or no response at all.

I have tried the following attributes configuration, all with unsuccessful results:

 - timestamp = "$.[0]", sensorvalue = "$.[1]"
 - timestamp = "$[0]", sensorvalue = "$[1]"
 - timestamp = "$[0][0]", sensorvalue = "$[0][1]"
 - timestamp = "[0]", sensorvalue = "[1]"

Am I missing something in the source definition or mapping attribute or should I conclude that this array-of-arrays inputstructure is just no parseable using the siddhi-json mapping?

If it helps; this is my simplified app definition:

@App:name("SO-SensorApp")

@source(type='http', receiver.url='http://0.0.0.0:5010/stream', @map(type='json', enclosing.element="$[0]", @attributes( timestamp = "[0]", sensorvalue = "[1]" )) )
define stream SensorAlertsStream ( timestamp long, sensorvalue double );

@sink(type='log', @map(type='json')) 
define stream log_values( sensorvalue double );

from SensorAlertsStream select sensorvalue insert into log_values;

Solution

  • I tested this in Siddhi Tooling 5.1.2 with siddhi-map-json-5.0.6, following configs will solve the issue, enclosing.element- '$', timestamp = "[0]", sensorvalue = "[1]"

    @source(type = 'http', receiver.url = 'http://0.0.0.0:8007/stream', 
        @map(type = 'json', enclosing.element = "$", 
            @attributes( timestamp = "[0]", sensorvalue = "[1]" )) )
    @sink(type = 'log') 
    define stream SensorAlertsStream1 ( timestamp long, sensorvalue double );