I have input stream InputStream
taking input from Kafka. Then I use this stream to call HTTP API. In HTTP call response, I have to send results to another Kafka topic using OutputStream
app flow
InputStream
(have userId) => RequestHttpStream
=> ResponseHTTPStream
(dont have userId) => OutputStream
(need userId here)
The problem is, i need userId
to send event to OutputStream
but the response HTTP stream ResponseHTTPStream
don't have notion of userId
. So I am looking for ways in siddhi to store and retrieve some key/values mapping. I read about tables
in siddhi but seems like it's shared between all events. What I need is kind of like an app session data
which only exists for that session only. Can I use partitions
for this?
code
@source(type = 'kafka',
topic.list = "user-entered-code",
bootstrap.servers = "kafka:9092",
@map(type = 'json', validate.json = "true",
@payload("""{"userId":"{{userId}}","promoCode":{{promoCode}}}""")))
define stream InputStream (userId string, promoCode string);
@sink(type = 'kafka', topic = "user-code-attribution-updated", bootstrap.servers = "kafka:9092",
@map(type = 'json', validate.json = "true",
@payload("""{"userId":"{{userId}}","attributes": {{attributes}}""")))
define stream OutputStream (userId string, attributes object);
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request", publisher.url = "http://test-node-server:3031/search-attribution-id",
@map(type = 'json'))
define stream RequestHttpStream(attributionId string);
@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200",
@map(type = 'json'))
define stream ResponseHTTPStream (attributes object);
from InputStream
select promoCode as attributionId
insert into RequestHttpStream;
from ResponseHTTPStream
select 'need userId here' as userId, attributes as attributes
insert into OutputStream;
Your use case is to keep an attribute between http-call sink and http-call-response source. This can be achieved by using transport properties. Since the attributes in the sink can be accessed in the source by referring to trp:<attribute name>
@sink(type = 'log')
@sink(type = 'http-call', sink.id = "attribution-request", publisher.url = "http://test-node-server:3031/search-attribution-id",
@map(type = 'json',
@payload("""{"attributionId":"{{attributionId}}"""")))
define stream RequestHttpStream(attributionId string, userId string);
@sink(type = 'log')
@source(type = 'http-call-response', sink.id = "attribution-request", http.status.code = "200",
@map(type = 'json',
@attributes(attributes='attributes', userId='userId')))
define stream ResponseHTTPStream (attributes object, userId string);
However please note, you have to use custom mapping in the sink since you do not want to send the userId to the HTTP endpoint