group-bystreamdistinctesper

Esper distinct events on multiple attributes


I have a problem with the stream semantics in Esper. My aim is to output only events with pairwise distinct attributes. Additionally, there are temporal conditions which have to hold between the attributes (see Espers Interval Algebra Reference).

An example statement:

insert into output_stream select a.*, b.*
from stream1#length(100) as a, stream2#length(100) as b
where a.before(b) or a.meets(b) or a.overlaps(b)

Pairwise distinct attributes means, I want to ensure that there are no two outputs o1, o2 where o1.a = o2.a or o1.b = o2.b. To give a more concrete example, if there are the results

o1: (a = a1, b = b1),
o2: (a = a1, b = b2),
o3: (a = a2, b = b2),
o4: (a = a2, b = b1)

only two of them shall be output (e.g. o1 and o3 or o2 and o4). Which one does not matter for now.

I wanted to accomplish the pairwise distinct attributes with a NOT EXISTS clause like this:

NOT EXISTS ( 
    select * from output_stream#length(100) as otherOutput 
    where a = otherOutput.a or b = otherOutput.b )

which works partly, for successive output the assertion o1.a = o2.a or o1.b = o2.b always holds.

However, when stream1 first delivers multiple "a"s and then stream2 delivers one "b", that matches the conditions to be joined with both "a"s, there are multiple outputs at once. This is not covered by my NOT EXISTS clause, because in the same step multiple outputs with the same "b" occur, and thus they are not yet in the output_stream.

The distinct keyword is not suitable here, since it checks all attributes together and not pairwise. Likewise, a simple group by on all attributes is unsuitable. I would love to have something like "distinct on a and distinct on b" as a criterion, but it does not exist.

I could possibly solve this with nested group bys where I group on each attribute

select first(*) from (select first(*) from output_stream group by a) group by b

but according to one comment has no well-defined semantics in stream processing systems. Thus, Esper does not allow subqueries in the from part of the query.

What I need is a way to force only output one output at a time and thus have the NOT EXISTS condition rechecked on every further output, or somehow check the outputs that occur at the same time against one another, before actually inserting them into the stream.

Update: Timing of the output is not very critical. The output_stream will be used by other such statements, so I can account for delays by increasing the length of the windows. stream1 and stream2 deliver events in the order of their startTimestamp property.


Solution

  • create schema Pair(a string, b string);
    create window PairWindow#length(100) as Pair;
    insert into PairWindow select * from Pair;
    on PairWindow as arriving select * from PairWindow as other  
      where arriving.a = other.a or arriving.b = other.b
    

    Here is a sample self-join using a named window that keeps the last 100 pairs.

    EDIT: Above query was designed for my understanding of the original requirements. Below query is designed for the new clarifications. It checks whether "a" or "b" had any previous value (in the last 100 events, leave #length(100) off as needed)

    create schema Pair(a string, b string);
    create window PairUniqueByA#firstunique(a)#length(100) as Pair;
    create window PairUniqueByB#firstunique(b)#length(100) as Pair;
    
    insert into PairUniqueByA select * from Pair;
    insert into PairUniqueByB select * from Pair;
    
    select * from Pair as pair
      where not exists (select a from PairUniqueByA as uba where uba.a = pair.a)
      and not exists (select a from PairUniqueByB as ubb where ubb.b = pair.b);