I have the following stream:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
It works fine for a while, and I can consume the messages populated on the Kafka topic. But from time to time, apparently at a random interval, there are no more messages published, and this code is not logging any errors (printAndByeBye will print the message passed and terminates the actor system.) After restarting the app, the messages continue to flow.
Any idea on how to know what is going on here?
Edit: I put Kamon on it and I could see the following behavior:
It looks like something stopped without informing the stream should stop, but I don't know how to make it explicit and stop the stream.