I am playing a bit with the NATS streaming and I have a problem with the subscriber rate limiting. When I set the max in flight to 1 and the timeout to 1 second and I have a consumer which is basically a Thread.sleep(1000) then I get multiple times the same event. I thought by limiting the in flight and using a manual ack this should not happen. How can I get exatly once delivery on very slow consumers?
case class EventBus[I, O](inputTopic: String, outputTopic: String, connection: Connection, eventProcessor: StatefulEventProcessor[I, O]) {
// the event bus could be some abstract class while the `Connection` coulbd be injected using DI
val substritionOptions: SubscriptionOptions = new SubscriptionOptions.Builder()
.setManualAcks(true)
.setDurableName("foo")
.setMaxInFlight(1)
.setAckWait(1, TimeUnit.SECONDS)
.build()
if (!inputTopic.isEmpty) {
connection.subscribe(inputTopic, new MessageHandler() {
override def onMessage(m: Message) {
m.ack()
try {
val event = eventProcessor.deserialize(m.getData)
eventProcessor.onEvent(event)
} catch {
case any =>
try {
val command = new String(m.getData)
eventProcessor.onCommand(command)
} catch {
case any => println(s"de-serialization error: $any")
}
} finally {
println("got event")
}
}
}, substritionOptions)
}
if (!outputTopic.isEmpty) {
eventProcessor.setBus(e => {
try {
connection.publish(outputTopic, eventProcessor.serialize(e))
} catch {
case ex => println(s"serialization error $ex")
}
})
}
}
abstract class StatefulEventProcessor[I, O] {
private var bus: Option[O => Unit] = None
def onEvent(event: I): Unit
def onCommand(command: String): Unit
def serialize(o: O): Array[Byte] =
SerializationUtils.serialize(o.asInstanceOf[java.io.Serializable])
def deserialize(in: Array[Byte]): I =
SerializationUtils.deserialize[I](in)
def setBus(push: O => Unit): Unit = {
if (bus.isDefined) {
throw new IllegalStateException("bus already set")
} else {
bus = Some(push)
}
}
def push(event: O) =
bus.get.apply(event)
}
EventBus("out-1", "out-2", sc, new StatefulEventProcessor[String, String] {
override def onEvent(event: String): Unit = {
Thread.sleep(1000)
push("!!!" + event)
}
override def onCommand(command: String): Unit = {}
})
(0 until 100).foreach(i => sc.publish("out-1", SerializationUtils.serialize(s"test-$i")))
First, there is no exactly once (re)delivery guarantee with NATS Streaming. What MaxInflight gives you, is the assurance that the server will not send new messages to the subscriber until the number of unacknowledged messages is below that number. So in case of MaxInflight(1), you are asking the server to send the next new message only after receiving the ack from the previously delivered message. However, this does not block redelivery of unacknowledged messages.
The server has no guarantee or no knowledge that a message is actually received by a subscriber. This is what the ACK is for, to let the server know that the message was properly processed by the subscriber. If the server would not honor redelivery (even when MaxInflight is reached), then a "lost" message would stall your subscription for ever. Keep in mind that NATS Streaming server and clients are not directly connected to each other with a TCP connection (they are both connected to a NATS server, aka gnatsd).