I have the following stream:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
It works fine for a while, and I can consume the messages populated on the Kafka topic. But from time to time, apparently at a random interval, there are no more messages published, and this code is not logging any errors (printAndByeBye will print the message passed and terminates the actor system.) After restarting the app, the messages continue to flow.
Any idea on how to know what is going on here?
Edit: I put Kamon on it and I could see the following behavior:
It looks like something stopped without informing the stream should stop, but I don't know how to make it explicit and stop the stream.
The stream was not failing, but the TCP stream went idle as the device publishing data stop sending data after a while without dropping the connection. Instead of using the simpler:
TCP().outgoingConnection(bsAddress, bsPort)
I end up using:
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???
so
Tcp().outgoingConnection(bsAddress, bsPort)
became
val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
connectTimeout = connectTimeout,
idleTimeout = idleTimeout
)
by informing idleTimeout, the follow start failing and another flow could be restarted.