wso2complex-event-processingsiddhiwso2-cep

WSO2 CEP : Siddhi QL: How to alert events consistently after matching certain condition


From the below stream, I want to alert the event which has occurred twice when the temperature goes beyond 90 (like every 2 event with temp > 90 needs to be alerted).

InputStream=[1001,91]
InputStream=[1001,86]
InputStream=[1002,70]
InputStream=[1001,85]
InputStream=[1003,70]
InputStream=[1003,85]
InputStream=[1002,70]
InputStream=[1003,70]
InputStream=[1003,87]
InputStream=[1002,70]
InputStream=[1001,95]
InputStream=[1001,96]
InputStream=[1001,97]
InputStream=[1001,98]
InputStream=[1001,98]

I have written something like this:

@Plan:name('TestExecutionPlan')

define stream InputStream (id string, temp int);

partition with (id of InputStream)
begin
from InputStream
select id, temp
having temp > 90  
insert into CriticalStream
end;

from CriticalStream[count(id) == 2]
select id, temp
group by id
--having count(id) == 2
insert into EventReporter;

However its alerting only 1 event in the EventReporter stream.

Below is the screen shot from Try It

I am expecting the EventReporter stream to have [1001,97] and [1001,98] as well, right now it has only the record for [1001,95]. Could someone please point out what I am doing wrong here. How I can loop through the events after grouping it? I tried adding window.time and window.length, but not getting the desired output. Any help / guidance would be really appreciated. Thank you.


Solution

  • You won't be requiring a partition there. You can simply use a filter and a lengthBatch window to get your desired output. Try below execution plan;

    @Plan:name('ExecutionPlan')
    
    @Import('InputStream:1.0.0')
    define stream InputStream (id string, temp int);
    
    /* Filter events with temp > 90 */
    from InputStream[temp > 90]
    insert into CriticalStream;
    
    /* Aggregate within a lengthBatch window, while group by id*/
    from CriticalStream#window.lengthBatch(2)
    select id, temp, count() as count
    group by id
    insert into EventReporter;
    
    /* Just for logging the result in the cosole */
    from EventReporter#log("Logging EventReporter : ")
    insert into #temp;