0

I have a problem with the stream semantics in Esper. My aim is to output only events with pairwise distinct attributes. Additionally, there are temporal conditions which have to hold between the attributes (see Espers Interval Algebra Reference).

An example statement:

insert into output_stream select a.*, b.*
from stream1#length(100) as a, stream2#length(100) as b
where a.before(b) or a.meets(b) or a.overlaps(b)

Pairwise distinct attributes means, I want to ensure that there are no two outputs o1, o2 where o1.a = o2.a or o1.b = o2.b. To give a more concrete example, if there are the results

o1: (a = a1, b = b1),
o2: (a = a1, b = b2),
o3: (a = a2, b = b2),
o4: (a = a2, b = b1)

only two of them shall be output (e.g. o1 and o3 or o2 and o4). Which one does not matter for now.

I wanted to accomplish the pairwise distinct attributes with a NOT EXISTS clause like this:

NOT EXISTS ( 
    select * from output_stream#length(100) as otherOutput 
    where a = otherOutput.a or b = otherOutput.b )

which works partly, for successive output the assertion o1.a = o2.a or o1.b = o2.b always holds.

However, when stream1 first delivers multiple "a"s and then stream2 delivers one "b", that matches the conditions to be joined with both "a"s, there are multiple outputs at once. This is not covered by my NOT EXISTS clause, because in the same step multiple outputs with the same "b" occur, and thus they are not yet in the output_stream.

The distinct keyword is not suitable here, since it checks all attributes together and not pairwise. Likewise, a simple group by on all attributes is unsuitable. I would love to have something like "distinct on a and distinct on b" as a criterion, but it does not exist.

I could possibly solve this with nested group bys where I group on each attribute

select first(*) from (select first(*) from output_stream group by a) group by b

but according to one comment has no well-defined semantics in stream processing systems. Thus, Esper does not allow subqueries in the from part of the query.

What I need is a way to force only output one output at a time and thus have the NOT EXISTS condition rechecked on every further output, or somehow check the outputs that occur at the same time against one another, before actually inserting them into the stream.

Update: Timing of the output is not very critical. The output_stream will be used by other such statements, so I can account for delays by increasing the length of the windows. stream1 and stream2 deliver events in the order of their startTimestamp property.

TAKeanice
  • 511
  • 3
  • 14
  • Instead of throwing some EPL design out, start with the requirements and provides some example events and expected output. From this discussion I can't really guess what pairwise distinct attributes means. It is like someone posting some Java code and expecting people to guess at the requirements. – user650839 Jan 31 '19 at 14:43
  • From-clauses in a streaming processor like Esper has infinite results. From-clauses in a relational database have finite results (just the rows already in the table at the time of query). Therefore a subquery in the from clause has no defined semantics in stream processing. In streaming processing thus you must always specify WHEN the result should occur. I.e. should the result occur every minute? Or for every incoming event? Or for every incoming event of a certain stream? Or something else? – user3613754 Jan 31 '19 at 15:07
  • Thank you, I will adapt the question to provide some example. Just for a quick response, I consider the attributes of two result tuples (a1, b1), (a1, b2) NOT pairwise distinct. (a1, b1), (a2, b2) or (a2, b1), (a1, b2) however are pairwise distinct. – TAKeanice Jan 31 '19 at 15:18
  • The combination beween a and b must not be random however, but follow specific conditions. For example, there may be the condition that `a.overlaps(b)` (see http://esper.espertech.com/release-8.0.0/reference-esper/html/datetimereference.html#datetime-method-intervalref). – TAKeanice Jan 31 '19 at 15:20
  • After the update there is still no information WHEN the output should occur. What triggers output? How does the runtime know all the events are receive and now we want results? .... What is the trigger of output? – user3613754 Jan 31 '19 at 16:05
  • 1
    See this post https://stackoverflow.com/questions/46917625/pairwise-independent-hash-functions-in-java. You maybe could encapsulate the pair into an object and implement "hashCode" and "equals" for it and treat it as a single value. – user650839 Jan 31 '19 at 16:06
  • I could indeed create an equals function that returns true whenever there are two attributes that are the same. The requirement for the hash function would be to play along with this. This could be most easily accomplished by just taking the output time of the object as hash, since I have only a problem when the output happens at the same time. – TAKeanice Jan 31 '19 at 17:28

1 Answers1

1
create schema Pair(a string, b string);
create window PairWindow#length(100) as Pair;
insert into PairWindow select * from Pair;
on PairWindow as arriving select * from PairWindow as other  
  where arriving.a = other.a or arriving.b = other.b

Here is a sample self-join using a named window that keeps the last 100 pairs.

EDIT: Above query was designed for my understanding of the original requirements. Below query is designed for the new clarifications. It checks whether "a" or "b" had any previous value (in the last 100 events, leave #length(100) off as needed)

create schema Pair(a string, b string);
create window PairUniqueByA#firstunique(a)#length(100) as Pair;
create window PairUniqueByB#firstunique(b)#length(100) as Pair;

insert into PairUniqueByA select * from Pair;
insert into PairUniqueByB select * from Pair;

select * from Pair as pair
  where not exists (select a from PairUniqueByA as uba where uba.a = pair.a)
  and not exists (select a from PairUniqueByB as ubb where ubb.b = pair.b);
user650839
  • 2,594
  • 1
  • 13
  • 9
  • Thank you for the answer. I only want one `Pair` that contains a certain `a` and one `Pair` that contains a certain `b`. How can I accomplish that? The last statement joins the pairs from which I want to exclude one, so it yields kind of the opposite result. – TAKeanice Feb 02 '19 at 11:23
  • Thanks for your edit! I will look whether this approach works as expected and then come back and report here :) – TAKeanice Feb 02 '19 at 13:15
  • I just looked into the "firstunique" window specifier, which I did not know before. Couldn´t I accomplish what I seek for by chaining several "firstunique" window specifiers? I.e. `create window OutputPairs#firstunique(a)#firstunique(b)#length(100) as Pair` ? – TAKeanice Feb 02 '19 at 13:20
  • Ok I just realized that this would not account for the possibility that `uba.a = pair.b` and `ubb.b = pair.a` – TAKeanice Feb 02 '19 at 13:35
  • The "#firstunique(a)#length(100)" is the intersection between the first-unique events by "a" and last 100 events. For the additional criteria you could add additional subqueries. – user650839 Feb 04 '19 at 12:44
  • So to help my understanding of the query: When the a certain value for "a" arrives, say 1, and there was no a with value 1 yet, some Pair(1, something) shows up in PairUniqueByA. At the same time, that pair is considered by the third statement (select * from Pair as pair). Wouldn´t the where clause fail now, because there _does_ exist a Pair (called uba) in PairUniqueByA with uba.a = pair.a? – TAKeanice Feb 05 '19 at 18:02
  • Esper executes event-by-event. Insert-into creates a new and next event to process. Thus "Pair" gets processed first and next does "PairUniqueByA" and then "PairUniqueByB". – user650839 Feb 06 '19 at 12:57
  • Thank you for the explanation. I will include it into the answer in a suggested edit. Still, one thing is missing, since I do not want Pairs with an "a" that appears in other Pairs as "b". So one of Pair(1,2) and Pair(2,3) should get excluded. This would be possible by another two NOT EXISTS subqueries, right? – TAKeanice Feb 06 '19 at 13:47