wso2siddhiwso2-cep

WSO2CEP SiddhiQL - how to insert the attributes from one stream to another stream


I am using the WSO2 CEP and I have create the following Execution plan:

  define stream sensor1Stream (timestamp string, id string, latitude double, longitude double, altitude double);

  define stream sensor2Stream (timestamp string, id string, latitude double, longitude double, altitude double);

  define stream alertStream (alert_id bool, alert_level string, accuracy_level string, s1_timestamp string, s1_id string, s1_latitude double, s1_longitude double, s1_altitude double, s2_timestamp string, s2_id string, s2_latitude double, s2_longitude double, s2_altitude double);

    from sensor1Stream
    select timestamp as s1_timestamp, id as s1_id, latitude as s1_latitude, longitude as s1_longitude, altitude as s1_altitude
    insert into alertStream(s1_timestamp, s1_id, s1_latitude, s1_longitude, s1_altitude);

    from sensor2Stream
    select timestamp as s2_timestamp, id as s2_id, latitude as s2_latitude, longitude as s2_longitude, altitude as s2_altitude
    insert into alertStream(s2_timestamp, s2_id, s2_latitude, s2_longitude, s2_altitude);

I want to insert the attributes from sensor1Stream and sensor2Stream in the alertStream. I have tried the above way, but does not work because of an error:

"You have an error in your SiddhiQL at line 39:23, extraneous input '(' expecting {, ';'}"

The error is between the alertStream and the parenthesis in the last line of the Execution Plan.

I do not know what I am doing wrong. I would be very grateful if somebody could help me on this matter.

Thanks!


Solution

  • Please find an example of a Siddhi execution plan below:

    @Plan:name('ExecutionPlan')
    
    @Import('org.wso2.event.sensor.stream:1.0.0')
    define stream sensor_stream (meta_timestamp long, meta_isPowerSaverEnabled bool, meta_sensorId int, meta_sensorName string, correlation_longitude double, correlation_latitude double, humidity float, sensorValue double);
    
    @Export('org.wso2.sensor.value.projected.stream:1.0.0')
    define stream sensor_value_projected_stream (meta_sensorId int, correlation_longitude double, correlation_latitude double, humidity float, value double);
    
    from sensor_stream
    select meta_sensorId, correlation_longitude, correlation_latitude, humidity, sensorValue as value
    insert into sensor_value_projected_stream;
    

    This example is taken from Siddhi sample 'Pass-Through/Projection Query in an Execution Plan'.

    You can refer more Siddhi samples here on WSO2 CEP documentation page. This Siddhi QL guide provides complete reference on Siddhi language.