wso2siddhistream-processingwso2-streaming-integratorevent-stream-processing

How to update a stream with the response from another stream where the sink type is "http-response"


Am trying to enrich my input stream with an additional attribute which gets populated via "http-response" response sink.

I have tried using the "join" with window attribute and with "every" keyword to merge two streams and inserting the resulting merged stream into another stream to enrich it.

The window attributes (window.time(1 sec) or window.length(1)) and "every" keyword works well when the incoming events are coming at a regular interval of 1 sec or more.

When (say for example 10 or 100) events are sent at the same time(within a second). Then the result of the merge is not in expected terms.

The one with "window" attribute (join)

**

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

**

The one with the "every" keyword

**

from every e1=EventInputStream,e2=EventResponseStream
select e1.variable1 as variable1, e1.variable2 as variable2, e2.variable3 as variable3
insert into EventOutputStream;

**

Is there any better way to merge the two streams in order to update a third stream?


Solution

  • To get the original request attributes, you can use custom mapping as follows,

    @source(type='http-call-response', sink.id='source-1'
           @map(type='json',@attributes(name='name', id='id', volume='trp:volume', price='trp:price')))
    define stream responseStream(name String, id int, headers String, volume long, price float);
    

    Here, the request attributes can be accessed with trp:attributeName, in this sample only name is from the response, price and volume is from the request.