2

Am trying to enrich my input stream with an additional attribute which gets populated via "http-response" response sink.

I have tried using the "join" with window attribute and with "every" keyword to merge two streams and inserting the resulting merged stream into another stream to enrich it.

The window attributes (window.time(1 sec) or window.length(1)) and "every" keyword works well when the incoming events are coming at a regular interval of 1 sec or more.

When (say for example 10 or 100) events are sent at the same time(within a second). Then the result of the merge is not in expected terms.

The one with "window" attribute (join)

**

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

**

The one with the "every" keyword

**

from every e1=EventInputStream,e2=EventResponseStream
select e1.variable1 as variable1, e1.variable2 as variable2, e2.variable3 as variable3
insert into EventOutputStream;

**

Is there any better way to merge the two streams in order to update a third stream?

2 Answers2

1

To get the original request attributes, you can use custom mapping as follows,

@source(type='http-call-response', sink.id='source-1'
       @map(type='json',@attributes(name='name', id='id', volume='trp:volume', price='trp:price')))
define stream responseStream(name String, id int, headers String, volume long, price float);

Here, the request attributes can be accessed with trp:attributeName, in this sample only name is from the response, price and volume is from the request.

Niveathika
  • 1,319
  • 2
  • 8
  • 17
  • I hope whatever you have suggested is for merging the input and the response stream and to insert the resulting stream into a third stream...I have tried what you have suggested but am getting "No extension exist for source:http-call-response" error... I tried with 'http-response' also but the result is not in expected terms... – Vigneshwaran Oct 03 '19 at 07:21
  • http-call-response is the latest sink for depreciated http-response sink. What version of siddhi are you trying on? and the io-http extension version. – Niveathika Oct 03 '19 at 07:24
  • I'm using 1.2.0 version of siddhi which comes in-built for wso2 version 4.4.0. Do I have update jar file ? – Vigneshwaran Oct 03 '19 at 10:59
  • Seems latest 2.0.5 is released, https://mvnrepository.com/artifact/org.wso2.extension.siddhi.io.http/siddhi-io-http. Let's try with that if not please share the siddhi app – Niveathika Oct 03 '19 at 12:56
  • I have tried using the version 2.0.5 and even the much latest 2.1.2 but no luck...Am getting the same error...Please find my siddhi app below...Due to character limit constraint, am sharing each of my streams as a separate comment... – Vigneshwaran Oct 08 '19 at 10:11
  • @info(name='Input') from InputStream select str:replaceAll(text, ' ', '%20') as x, str:concat(str:lower(beta), '_1000001') as name, str:concat(str:lower(beta), '_1000001_id') as id, str:lower(beta) as beta, str:lower(gamma) as gamma insert into HitAlphaStream; – Vigneshwaran Oct 08 '19 at 10:11
  • @sink(type='http-call', sink.id="AlphaStream", publisher.url='http://10.236.220.136:5071/parse?x={{x}}&name={{name}}&id={{id}}', method='GET', headers="'Content-Type:text/plain'", @map(type='keyvalue', @payload(x='{{x}}',name='{{name}}',id='{{id}}'))) define stream HitAlphaStream (x string, name string, id string, beta string, gamma string); – Vigneshwaran Oct 08 '19 at 10:13
  • @source(type='http-call-response', sink.id='AlphaStream', @map(type='json',@attributes(event = 'intent', text = 'text', beta='trp:beta', gamma='trp:gamma'))) @sink(type='log') define stream AlphaResponseStream(event string, text string, beta string, gamma string); – Vigneshwaran Oct 08 '19 at 10:13
  • And I have tried upgrading the io-http extension version by replacing the existing jar file(siddhi-io-http-1.2.0.jar) inside sp's worker container with the new jar file(siddhi-io-http-2.1.2.jar)... But i don't think version upgrade is happening through this way..Could you please suggest me a way to achieve this ... – Vigneshwaran Oct 08 '19 at 13:13
  • I am sorry for the confusion, but it seems WSO2 SP 4.4.0 only supports Siddhi 4 and siddhi-io-http2x versions are not compatible. You have to use http 1x version extensions. Your syntax is correct, you just have to revert the sink and source types to http-request and http-response and it will work. – Niveathika Oct 10 '19 at 07:20
  • We have been using this sink(http-response) and source(http-request) types already.Our issue is not with the request and response streams but with the merging of the response stream and the input stream (input stream is different from request stream as request will have only a few attributes of the input stream which are necessary to get response).We tried to insert the result of this merge into an Output Stream.The merge is working fine when we send 1 event/sec and it fails when we send 5 or 10 events/sec. When we send 10/per sec only 3 or 4 response and input streams are merged.Please help. – Vigneshwaran Oct 10 '19 at 10:28
  • As discussed offline, trp properties works for http-request and http-response. Please mark the answer as correct after you try it out, – Niveathika Oct 10 '19 at 12:03
-1

The syntax in your 'every' keyword approach isn't quite right. Have you tried something like this:

from every (e1 = event1) -> e2=event2[e1.variable == e2.variable]
select e1.variable1, e2.variable1, e2.variable2
insert into outputEvent;

This document might help.

Das_Geek
  • 2,775
  • 7
  • 20
  • 26
  • 1
    Even though the above is correct. In the use case of http-request sink and http-response source, we can use trp:properties which is efficent – Niveathika Oct 14 '19 at 04:55