wso2siddhi

is there a way in siddhi to store app session data?


I have input stream InputStream taking input from Kafka. Then I use this stream to call HTTP API. In HTTP call response, I have to send results to another Kafka topic using OutputStream

app flow
InputStream (have userId) => RequestHttpStream => ResponseHTTPStream (dont have userId) => OutputStream (need userId here)

The problem is, i need userId to send event to OutputStream but the response HTTP stream ResponseHTTPStream don't have notion of userId. So I am looking for ways in siddhi to store and retrieve some key/values mapping. I read about tables in siddhi but seems like it's shared between all events. What I need is kind of like an app session data which only exists for that session only. Can I use partitions for this?

code


@source(type = 'kafka',
    topic.list = "user-entered-code", 
    bootstrap.servers = "kafka:9092", 
    @map(type = 'json', validate.json = "true", 
        @payload("""{"userId":"{{userId}}","promoCode":{{promoCode}}}""")))
define stream InputStream (userId string, promoCode string);

@sink(type = 'kafka', topic = "user-code-attribution-updated", bootstrap.servers = "kafka:9092", 
    @map(type = 'json', validate.json = "true", 
        @payload("""{"userId":"{{userId}}","attributes": {{attributes}}""")))
define stream OutputStream (userId string, attributes object);

@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request",  publisher.url = "http://test-node-server:3031/search-attribution-id", 
    @map(type = 'json'))
define stream RequestHttpStream(attributionId string);

@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200", 
    @map(type = 'json'))
define stream ResponseHTTPStream (attributes object);

from InputStream
select promoCode as attributionId
insert into RequestHttpStream;

from ResponseHTTPStream
select 'need userId here' as userId, attributes as attributes
insert into OutputStream;


Solution

  • Your use case is to keep an attribute between http-call sink and http-call-response source. This can be achieved by using transport properties. Since the attributes in the sink can be accessed in the source by referring to trp:<attribute name>

    @sink(type = 'log')
    @sink(type = 'http-call', sink.id = "attribution-request",  publisher.url = "http://test-node-server:3031/search-attribution-id", 
        @map(type = 'json', 
            @payload("""{"attributionId":"{{attributionId}}"""")))
    define stream RequestHttpStream(attributionId string, userId string);
    
    @sink(type = 'log')
    @source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200", 
        @map(type = 'json', 
           @attributes(attributes='attributes', userId='userId')))
    define stream ResponseHTTPStream (attributes object, userId string);
    

    However please note, you have to use custom mapping in the sink since you do not want to send the userId to the HTTP endpoint