wso2complex-event-processingsiddhiwso2-cep

How to put an arrival and ingestion timestamp in siddhi CEP


I want to put the arrival timestamp for the stream as soon as it arrives at the WSO2 IoT server and Ingestion timestamp when it is consumed by the CEP engine. These times will be used to compute Queuing latency and CEP latency as follows.

Queuing latency = ingestion time - arrival time
CEP latency = detection time - ingestion time

Below is my execution plan

@Plan:name('Server_CEP')

@Plan:statistics('true')

@Plan:trace('true')

@plan:async(bufferSize='1024')


@Import('stream2_scep:1.0.0')
define stream eeg_stream (meta_sensorID_s2 int, meta_tupleID_s2 int, value_s2 int, generationTime_s2 long);

@Import('stream1_scep:1.0.0')
define stream ecg_stream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);

@Export('cep_stream_scep:1.0.0')
define stream CEPStream (cep_event int, cepLatency long);


from every ecg = ecg_stream[value_s1 >= 50 ] ->  eeg = eeg_stream[value_s2 >= 50] within 10 sec

 select  ecg.value_s1 as  cep_event ,  convert(time:currentTimestamp(), 'long')  - ecg.generationTime_s1  as cepLatency

 insert into CEPStream;

I am able to find the detection time as the current time when CEP event is detected. I am also using the @async with a buffer size of 1024. Now the issue is how do I timestamp the arrival time of the stream as soon as it arrives. Also, the second issue is how to put an engine ingestion timestamp.

Can someone tell me how can I achieve this?

PS: I was able to achieve this is an Android device as I used a non-blocking queue, and arrival time was the time it arrives into FIFO queue and ingestion time was the time it is dequeued


Solution

  • I did this by creating an execution plan which receives a stream x and put a timestamp on it and sends it to another stream y. Example code of such an execution plan is

    @Plan:name('scep_s1_arrival_timestamping')
    
    @Plan:statistics('false')
    
    @Plan:trace('false')
    
    @Import('stream1_scep:1.0.0')
    define stream inputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);
    
    @Export('stream1_scep:2.0.0')
    define stream outputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long, arrivalTime_s1 long);
    
    from  inputStream
    select meta_sensorID_s1 as meta_sensorID_s1 , meta_tupleID_s1 as meta_tupleID_s1,  value_s1 as value_s1,  generationTime_s1 as generationTime_s1, convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
    insert into outputStream;
    

    The timestamping is done using convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1 . Note that covert is used to convert the datatype to long which is then inserted into arrivalTime_s1 variable.