1

I was running a spark job that was supposed to write DataFrame to a collection. It was running for about an hour and then suddenly stopped with the following exception:

Exception in thread "main" org.apache.spark.SparkException: 
Job aborted due to stage failure: Task 20273 in stage 29.0 failed 1 times, most recent failure: 
Lost task 20273.0 in stage 29.0 (TID 21088, localhost, executor driver): 
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
    at com.mongodb.connection.SocketStream.read(SocketStream.java:88)
    at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:494)
    at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:224)
    at com.mongodb.connection.UsageTrackingInternalConnection.receiveMessage(UsageTrackingInternalConnection.java:96)
    at com.mongodb.connection.DefaultConnectionPool$PooledConnection.receiveMessage(DefaultConnectionPool.java:440)
    at com.mongodb.connection.WriteCommandProtocol.receiveMessage(WriteCommandProtocol.java:262)
    at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:104)
    at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:67)
    at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:37)
    at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
    at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
    at com.mongodb.connection.DefaultServerConnection.insertCommand(DefaultServerConnection.java:118)
    at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteCommandProtocol(MixedBulkWriteOperation.java:465)
    at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:656)
    at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:411)
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:177)
    at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
    at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:422)
    at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:413)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
    at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
    at com.mongodb.Mongo.execute(Mongo.java:845)
    at com.mongodb.Mongo$2.execute(Mongo.java:828)
    at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:338)
    at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:322)
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119)
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119)
    at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118)
    at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
    at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
    at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
    at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118)
    at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Only a portion of the data was written to MongoDB. I'm using Scala with the SparkMongoDB connector. How can I solve this issue?

Edit

Here is the setup code for the Spark session:

val spark: SparkSession = SparkSession.builder()
                        .appName("Spark Movie Similarities")
                        .master("local[*]")
                        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
                        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.similarities")
                        .getOrCreate()

And here is how I write results back to MongoDB:

MongoSpark.save(movieSimilarities)

Where movieSimilarities is a Spark DataFrame. Nothing too special here. And the task fails only after certain amount of records have been already successfully written to MongoDB

  • Could you please provide code snippet which generated this error? – Pavel Oct 21 '17 at 17:14
  • 1
    Possible duplicate of [Mongodb: \`com.mongodb.MongoSocketReadException: Prematurely reached end of stream\` with morphia](https://stackoverflow.com/questions/42150578/mongodb-com-mongodb-mongosocketreadexception-prematurely-reached-end-of-strea) – Pavel Oct 21 '17 at 17:14
  • I saw this post but I didn't get how those answers could solve my problem. As in my case its Spark related. – Daniil Andreyevich Baunov Oct 21 '17 at 17:21
  • Please share your code snippet, agree, you context slightly different – Pavel Oct 21 '17 at 17:24
  • Have you try to apply maxConnectionIdleTime ? as suggested in the duplicate reference? its more likely you are doing long write and mongo just timed out connection ... – Pavel Oct 21 '17 at 17:31
  • I'm not sure how can I set that property from MongoDB Spark connector. – Daniil Andreyevich Baunov Oct 21 '17 at 17:49

0 Answers0