Here is code I wrote to insert documents into a DB:
import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala.result.InsertOneResult
import org.mongodb.scala.{Document, MongoClient, MongoCollection, MongoDatabase, Observer, SingleObservable}
object MongoFactory extends LazyLogging{
val uri: String = "mongodb+srv://uname:connectionString?retryWrites=true&w=majority"
val client: MongoClient = MongoClient(uri)
val db: MongoDatabase = client.getDatabase("myDB")
val collection: MongoCollection[Document] = db.getCollection("cdata")
def insertDocument(document: Document) = {
val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)
singleObservable.subscribe(new Observer[InsertOneResult] {
override def onNext(result: InsertOneResult): Unit = logger.info(s"onNext: $result")
override def onError(e: Throwable): Unit = logger.info(s"onError: $e")
override def onComplete(): Unit = logger.info("onComplete")
})
}
}
After executing approx 10'000 inserts (calling MongoFactory.insertDocument) the following exception is thrown:
Exception in monitor thread while connecting to server o-test-cluster-shard-00-00.dsfs.mongodb.net:27017
com.mongodb.MongoSocketReadException: Exception receiving message
at com.mongodb.internal.connection.InternalStreamConnection.translateReadException(InternalStreamConnection.java:637) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.receiveMessageWithAdditionalTimeout(InternalStreamConnection.java:516) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:356) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:305) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:218) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144) ~[mongodb-driver-core-4.1.0.jar:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.IOException: Connection reset by peer
at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:na]
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:na]
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[na:na]
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245) ~[na:na]
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[na:na]
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358) ~[na:na]
at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.readFromChannel(TlsChannelImpl.java:356) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.readFromChannel(TlsChannelImpl.java:343) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.readAndUnwrap(TlsChannelImpl.java:622) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.read(TlsChannelImpl.java:235) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.ClientTlsChannel.read(ClientTlsChannel.java:168) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.async.AsynchronousTlsChannelGroup.readHandlingTasks(AsynchronousTlsChannelGroup.java:594) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.async.AsynchronousTlsChannelGroup.doRead(AsynchronousTlsChannelGroup.java:559) ~[mongodb-driver-core-4.1.0.jar:na]
at com.mongodb.internal.connection.tlschannel.async.AsynchronousTlsChannelGroup.lambda$processRead$5(AsynchronousTlsChannelGroup.java:471) ~[mongodb-driver-core-4.1.0.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
... 1 common frames omitted
23:29:53.755 [cluster-ClusterId{value='5fc20d1b2e933e515a87a4ab', description='null'}-technical-test-cluster-shard-00-00.zyqyw.mongodb.net:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:39, serverValue:923013}] to-test-cluster-shard-00-00.dsfs.mongodb.net:27017
How I manage the connection to Mongo DB can be much improved upon. I believe the exception is being thrown when MongoDB server resets the connection after a period of time and as the client
is not aware of this - the code val client: MongoClient = MongoClient(uri)
is executed once and if the connection is reset then client
should be re-initialized to create a new connection.
Should I convert to something like the below update where if the exception is thrown. Below code reinitializes the connection when an exception is thrown:
import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala.result.InsertOneResult
import org.mongodb.scala.{Document, MongoClient, MongoCollection, MongoDatabase, Observer, SingleObservable}
object MongoFactory extends LazyLogging{
val uri: String = "mongodb+srv://uname:connectionString?retryWrites=true&w=majority"
var client: MongoClient = MongoClient(uri)
var db: MongoDatabase = client.getDatabase("myDB")
var collection: MongoCollection[Document] = db.getCollection("cdata")
def insertDocument(document: Document) = {
try {
val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)
singleObservable.subscribe(new Observer[InsertOneResult] {
override def onNext(result: InsertOneResult): Unit = logger.info(s"onNext: $result")
override def onError(e: Throwable): Unit = logger.info(s"onError: $e")
override def onComplete(): Unit = logger.info("onComplete")
})
}
catch {
case e : Exception =>
client = MongoClient(uri)
db = client.getDatabase("myDB")
collection = db.getCollection("cdata")
try {
val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)
singleObservable.subscribe(new Observer[InsertOneResult] {
override def onNext(result: InsertOneResult): Unit = logger.info(s"onNext: $result")
override def onError(e: Throwable): Unit = logger.info(s"onError: $e")
override def onComplete(): Unit = logger.info("onComplete")
})
}
}
}
}
What are better solutions to this problem, how to re-create the connection if a connection exception is thrown ?