wso2complex-event-processingwso2-dassiddhi

WSO2 DAS SiddhiQL: Initialize an event table and update it after


I have some related questions :

  1. Fisrt, I want to initialize an event table with default values and 100 rows like on this picture:

How I want to initialize my event table named 'counterTable'.

  1. Second, once the initialization is done I would like to update this table. How can I execute in the same execution plan a query2 once the query1 execution is finished?

  2. To finish, I have an event with 'altitude' attribute. In my execution plan, for each event, I want to increment count1 of every row of my event table where the num column is smaller than the alitude. I tried it but that doesn't increment count of all rows.

    FROM inputStream JOIN counterTable
    SELECT count1+1 as count1, altitude as tempNum
    update counterTable on counterTable.count1 < tempNum;
    
    FROM inputStream JOIN counterTable
    SELECT counterTable.num as theAltitude, counterTable.count1 as countAltitude
    INSERT INTO outputStream;
    

Solution

    1. If you want to initialize each time the execution plan gets deployed, you should use an in-memory event table (as shown below). Otherwise, you can simply use an RDBMS event table, where it's already been initialized.

    2. Queries will run in the order that they have defined, but that process will occur per each event (not as a batch, i.e if there two queries which consume from inputStream, when event 1 comes into inputStream it goes to query 1, then to query 2, and then only event 2 will get consumed..).

    3. Refer to the below snippet;

      /* Define the trigger to be used with initialization */
      define trigger triggerStream at 'start';
      
      /* Define streams */
      define stream inputStream (altitude int);
      define stream outputStream (theAltitude long, countAltitude int);
      
      /* Define table */
      define table counterTable (num long, count1 int, count2 int, tempNum int);
      
      /* Iterate and generate 100 events */
      from triggerStream[count() < 100]
      insert into triggerStream;
      
      /* Using above 100 events, initialize the event table */
      from triggerStream
      select count() as num, 0 as count1, 0 as count2, 0 as tempNum
      insert into counterTable;
      
      /* Perform the update logic here */
      from inputStream as i join counterTable as c
          on c.count1 < i.altitude
      select c.num, (c.count1 + 1) as count1, c.count2, altitude as tempNum
      insert into updateStream;
      
      from updateStream
      insert overwrite counterTable
          on counterTable.num == num;
      
      /* Join the table and get the updated results */
      from inputStream join counterTable as c
      select c.num as theAltitude, c.count1 as countAltitude
      insert into outputStream;