1

My intention is to merge attributes from two streams in siddhi.

I'm using the "join" with window attribute to merge two streams in my siddhi query and inputting the result of the join to another stream to enrich it.

The window attributes (window.time(1 sec) or window.length(1)) works well when the incoming events are coming at a regular interval of 1 sec or more.

When (say for example 10 or 100) events are sent at the same time(within a second). Then the result of the join is not in expected terms.

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

Kindly let me know whether is there any way to merge two streams where the incoming events has to be treated/processed uniquely even when the number of events sent at a time(within a second) are large.

1 Answers1

0

Can you elaborate more on what you meant by

Then the result of the join is not in expected terms.

Which stream EventInputStreamOne or EventInputStreamTwo is receiving the events in a higher rate.

As per your Siddhi app, events comes to EventInputStreamOne stream within 1sec will be compared with each event from EventInputStreamTwo and the matched events will be output to EventOutputStream.

If you want to compare the events that come to both EventInputStreamOne and EventInputStreamTwo within 1sec, have a time window of 1sec at EventInputStreamTwo as well.

Check the Siddhi documentation on Joins for more details.

Chiran Fernando
  • 427
  • 3
  • 6
  • Am trying to enrich my input stream with an additional attribute which gets populated via "http-response" response sink.I have tried using the "join" with window attribute and with "every" keyword to merge two streams and inserting the resulting merged stream into another stream to enrich it.'EventInputStreamOne' is receiving events at a higher rate.EventInputStreamTwo is getting an additional attribute which gets populated via "http-response" sink and these two will be merged and sent to EventOutputStream.When you send 10 events at a time, the merge is happening only for say 3 or 4 events. – Vigneshwaran Oct 09 '19 at 10:07
  • "If you want to compare the events that come to both EventInputStreamOne and EventInputStreamTwo within 1sec, have a time window of 1sec at EventInputStreamTwo as well." I tried this but here the merge is happening twice(duplication)..Even here not all the 10 events are getting merged.. – Vigneshwaran Oct 09 '19 at 10:10
  • Can you share the Siddhi app you are testing removing sensitive information – Chiran Fernando Oct 09 '19 at 10:19
  • Due to character limit constraint, am sharing each of my streams as a separate comment... – Vigneshwaran Oct 09 '19 at 11:11
  • `code` @source(type='http', receiver.url='http://0.0.0.0:9030/event', @map(type = 'json', @attributes(alpha = '$.alpha', beta = '$.beta', text = '$.text'))); define stream EventInputStreamOne(alpha string, beta string, text string) – Vigneshwaran Oct 09 '19 at 11:21
  • @source(type='http-response', sink.id='EventStream', @map(type='json',@attributes(gamma = 'intent', text = 'text'))) @sink(type='log') define stream EventInputStreamTwo(gamma string, text string); – Vigneshwaran Oct 09 '19 at 11:21
  • from EventInputStreamOne#window.time(1 sec) as i join EventInputStreamTwo as s on i.text == s.text select i.alpha as alpha, i.beta as beta, s.text as text insert into EventOutputStream; – Vigneshwaran Oct 09 '19 at 11:21
  • @sink(type='log', prefix='TEST JOIN ') define stream EventOutputStream (alpha string, beta string, text string); – Vigneshwaran Oct 09 '19 at 11:24