Essentially what the subject says.
I'm wondering if JetStream can be queried in a way that allows us to refetch either the last 15 messages of subject "foo.*" or the messages that JetStream received on subject "foo.*" in the last 1.5 seconds.
If that's possible, any code-samples or links to code-samples are appreciated.
There is a way to achieve the time-related retrieval in JetStream.
now := time.Now()
oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)
js, _ := nc.JetStream()
sub, err := js.SubscribeSync(
"foo.*",
nats.OrderedConsumer(),
nats.StartTime(oneAndHalfSecondAgo),
)
for {
msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
if err != nil {
log.Fatal(err)
}
// 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here
// 2. if the message is before now we can push it in an array here
}
Note that this technique, though useful, is quite inefficient because we grab messages one by one.
We could modify it using .Subscribe() (which is asynchronous) but then we would have a different problem:
We would overpull from JetStream past the present moment and then we would have to make sure that the buffered messages that we grabbed will indeed be returned back to JetStream. As far as I can tell there is no configuration option to tell JetStream about the "MaxTime".
As for how to "get the latest N-Messages" one could modify the above code-sample so that he will get a reasonably high chunk of messages (p.e. all messages in the last 5secs or 10secs or 30secs) and after getting all the messages up to the present moment he can then grab the latest 'N' messages.
This technique is not ideal ofcourse but there doesn't seem to be another way to do this - at least not at the time of this writing.