siddhinats.ionats-streaming-server

Getting the NATS streaming event sequence number in Siddhi


I have a stream of tweets in text format (TwitterStream) and a stream of sentiments for each tweet (SentimentStream). The SentimentStream subscribes to the TwitterStream, does a sentiment analysis and publishes a new message with the result and the TwitterStream sequence number.

I'm trying to join these two streams where SentimentStream.seq is equal to the sequence number of the tweets. The problem I'm having is that I cannot get a "handle" of the sequence number from the TwitterStream.

I've been trying to find a way to get event "metadata" that might give some insights on the position / sequence number of the event.

@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')

@sink(type = 'log', 
    @map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);

@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);

@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);

-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;

Any help will be appreciated.


Solution

  • We have created a feature improvement request through https://github.com/siddhi-io/siddhi-io-nats/issues/25 to provide this support.