0

Here is code I wrote to insert documents into a DB:

import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala.result.InsertOneResult
import org.mongodb.scala.{Document, MongoClient, MongoCollection, MongoDatabase, Observer, SingleObservable}

object MongoFactory extends LazyLogging{

  val uri: String = "mongodb+srv://uname:connectionString?retryWrites=true&w=majority"
  val client: MongoClient = MongoClient(uri)
  val db: MongoDatabase = client.getDatabase("myDB")
  val collection: MongoCollection[Document] = db.getCollection("cdata")

  def insertDocument(document: Document) = {
    val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)

    singleObservable.subscribe(new Observer[InsertOneResult] {

      override def onNext(result: InsertOneResult): Unit = logger.info(s"onNext: $result")

      override def onError(e: Throwable): Unit = logger.info(s"onError: $e")

      override def onComplete(): Unit = logger.info("onComplete")
    })

  }

}

After executing approx 10'000 inserts (calling MongoFactory.insertDocument) the following exception is thrown:

Exception in monitor thread while connecting to server o-test-cluster-shard-00-00.dsfs.mongodb.net:27017
com.mongodb.MongoSocketReadException: Exception receiving message
    at com.mongodb.internal.connection.InternalStreamConnection.translateReadException(InternalStreamConnection.java:637) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.receiveMessageWithAdditionalTimeout(InternalStreamConnection.java:516) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:356) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.receive(InternalStreamConnection.java:305) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:218) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144) ~[mongodb-driver-core-4.1.0.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.IOException: Connection reset by peer
    at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:na]
    at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:na]
    at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[na:na]
    at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245) ~[na:na]
    at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[na:na]
    at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358) ~[na:na]
    at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.readFromChannel(TlsChannelImpl.java:356) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.readFromChannel(TlsChannelImpl.java:343) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.readAndUnwrap(TlsChannelImpl.java:622) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.impl.TlsChannelImpl.read(TlsChannelImpl.java:235) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.ClientTlsChannel.read(ClientTlsChannel.java:168) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.async.AsynchronousTlsChannelGroup.readHandlingTasks(AsynchronousTlsChannelGroup.java:594) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.async.AsynchronousTlsChannelGroup.doRead(AsynchronousTlsChannelGroup.java:559) ~[mongodb-driver-core-4.1.0.jar:na]
    at com.mongodb.internal.connection.tlschannel.async.AsynchronousTlsChannelGroup.lambda$processRead$5(AsynchronousTlsChannelGroup.java:471) ~[mongodb-driver-core-4.1.0.jar:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    ... 1 common frames omitted
23:29:53.755 [cluster-ClusterId{value='5fc20d1b2e933e515a87a4ab', description='null'}-technical-test-cluster-shard-00-00.zyqyw.mongodb.net:27017] INFO  org.mongodb.driver.connection - Opened connection [connectionId{localValue:39, serverValue:923013}] to-test-cluster-shard-00-00.dsfs.mongodb.net:27017

How I manage the connection to Mongo DB can be much improved upon. I believe the exception is being thrown when MongoDB server resets the connection after a period of time and as the client is not aware of this - the code val client: MongoClient = MongoClient(uri) is executed once and if the connection is reset then client should be re-initialized to create a new connection.

Should I convert to something like the below update where if the exception is thrown. Below code reinitializes the connection when an exception is thrown:

import com.typesafe.scalalogging.LazyLogging
import org.mongodb.scala.result.InsertOneResult
import org.mongodb.scala.{Document, MongoClient, MongoCollection, MongoDatabase, Observer, SingleObservable}

object MongoFactory extends LazyLogging{

  val uri: String = "mongodb+srv://uname:connectionString?retryWrites=true&w=majority"
  var client: MongoClient = MongoClient(uri)
  var db: MongoDatabase = client.getDatabase("myDB")
  var collection: MongoCollection[Document] = db.getCollection("cdata")

  def insertDocument(document: Document) = {

    try {
      val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)

      singleObservable.subscribe(new Observer[InsertOneResult] {

        override def onNext(result: InsertOneResult): Unit = logger.info(s"onNext: $result")

        override def onError(e: Throwable): Unit = logger.info(s"onError: $e")

        override def onComplete(): Unit = logger.info("onComplete")
      })
    }
    catch {
      case e : Exception =>
         client = MongoClient(uri)
         db = client.getDatabase("myDB")
         collection = db.getCollection("cdata")

        try {
          val singleObservable: SingleObservable[InsertOneResult] = collection.insertOne(document)

          singleObservable.subscribe(new Observer[InsertOneResult] {

            override def onNext(result: InsertOneResult): Unit = logger.info(s"onNext: $result")

            override def onError(e: Throwable): Unit = logger.info(s"onError: $e")

            override def onComplete(): Unit = logger.info("onComplete")
          })
        }
    }

  }

}

What are better solutions to this problem, how to re-create the connection if a connection exception is thrown ?

blue-sky
  • 51,962
  • 152
  • 427
  • 752
  • What problem do you think you have? I see a log entry informing you of a network error, this sort of thing is normal over internet. – D. SM Nov 29 '20 at 16:49
  • The exception you are showing is from monitoring thread, it isn't produced by any operation issued by your application. – D. SM Nov 29 '20 at 16:51
  • @D.SM How should my application behave when this exception is thrown? As the DB server has reset the connection (Caused by: java.io.IOException: Connection reset by peer) then shouldn't my application try to create a new connection? From https://stackoverflow.com/questions/8658118/when-is-java-io-ioexceptionconnection-reset-by-peer-thrown ioexception is caused by the server my application is connecting to. – blue-sky Nov 29 '20 at 16:59
  • The monitoring thread is managed by the driver. No action from you is needed or possible. – D. SM Nov 29 '20 at 17:03
  • @D.SM the data has not been inserted? Therefore I should take some action to try and persist the data to DB? – blue-sky Nov 29 '20 at 17:05
  • If an insert failed you should have an exception relating to that? – D. SM Nov 29 '20 at 18:06

0 Answers0