From the below stream, I want to alert the event which has occurred twice when the temperature goes beyond 90 (like every 2 event with temp > 90 needs to be alerted).
InputStream=[1001,91]
InputStream=[1001,86]
InputStream=[1002,70]
InputStream=[1001,85]
InputStream=[1003,70]
InputStream=[1003,85]
InputStream=[1002,70]
InputStream=[1003,70]
InputStream=[1003,87]
InputStream=[1002,70]
InputStream=[1001,95]
InputStream=[1001,96]
InputStream=[1001,97]
InputStream=[1001,98]
InputStream=[1001,98]
I have written something like this:
@Plan:name('TestExecutionPlan')
define stream InputStream (id string, temp int);
partition with (id of InputStream)
begin
from InputStream
select id, temp
having temp > 90
insert into CriticalStream
end;
from CriticalStream[count(id) == 2]
select id, temp
group by id
--having count(id) == 2
insert into EventReporter;
However its alerting only 1 event in the EventReporter stream.
Below is the screen shot from Try It
I am expecting the EventReporter stream to have [1001,97] and [1001,98] as well, right now it has only the record for [1001,95]. Could someone please point out what I am doing wrong here. How I can loop through the events after grouping it? I tried adding window.time and window.length, but not getting the desired output. Any help / guidance would be really appreciated. Thank you.
You won't be requiring a partition there. You can simply use a filter and a lengthBatch window to get your desired output. Try below execution plan;
@Plan:name('ExecutionPlan')
@Import('InputStream:1.0.0')
define stream InputStream (id string, temp int);
/* Filter events with temp > 90 */
from InputStream[temp > 90]
insert into CriticalStream;
/* Aggregate within a lengthBatch window, while group by id*/
from CriticalStream#window.lengthBatch(2)
select id, temp, count() as count
group by id
insert into EventReporter;
/* Just for logging the result in the cosole */
from EventReporter#log("Logging EventReporter : ")
insert into #temp;