I have some related questions :
Second, once the initialization is done I would like to update this table. How can I execute in the same execution plan a query2 once the query1 execution is finished?
To finish, I have an event with 'altitude' attribute. In my execution plan, for each event, I want to increment count1
of every row of my event table where the num column is smaller than the alitude. I tried it but that doesn't increment count of all rows.
FROM inputStream JOIN counterTable
SELECT count1+1 as count1, altitude as tempNum
update counterTable on counterTable.count1 < tempNum;
FROM inputStream JOIN counterTable
SELECT counterTable.num as theAltitude, counterTable.count1 as countAltitude
INSERT INTO outputStream;
If you want to initialize each time the execution plan gets deployed, you should use an in-memory event table (as shown below). Otherwise, you can simply use an RDBMS event table, where it's already been initialized.
Queries will run in the order that they have defined, but that process will occur per each event (not as a batch, i.e if there two queries which consume from inputStream
, when event 1 comes into inputStream
it goes to query 1, then to query 2, and then only event 2 will get consumed..).
Refer to the below snippet;
/* Define the trigger to be used with initialization */
define trigger triggerStream at 'start';
/* Define streams */
define stream inputStream (altitude int);
define stream outputStream (theAltitude long, countAltitude int);
/* Define table */
define table counterTable (num long, count1 int, count2 int, tempNum int);
/* Iterate and generate 100 events */
from triggerStream[count() < 100]
insert into triggerStream;
/* Using above 100 events, initialize the event table */
from triggerStream
select count() as num, 0 as count1, 0 as count2, 0 as tempNum
insert into counterTable;
/* Perform the update logic here */
from inputStream as i join counterTable as c
on c.count1 < i.altitude
select c.num, (c.count1 + 1) as count1, c.count2, altitude as tempNum
insert into updateStream;
from updateStream
insert overwrite counterTable
on counterTable.num == num;
/* Join the table and get the updated results */
from inputStream join counterTable as c
select c.num as theAltitude, c.count1 as countAltitude
insert into outputStream;