3

I am reading a MongoDB collection using newAPIHadoopRDD in Java. First I create a JavaSparkContext object using the following class:

public class SparkLauncher {
    public JavaSparkContext javaSparkContext ;

    public SparkLauncher()
    {
        javaSparkContext = null;
    }

    public JavaSparkContext getSparkContext() {
        if (javaSparkContext == null ) {
            System.out.println("SPARK INIT...");
            try {
                System.setProperty("spark.executor.memory", "2g");
                Runtime runtime = Runtime.getRuntime();
                runtime.gc();
                int numOfCores = runtime.availableProcessors();                 
                numOfCores=3;               
                SparkConf conf = new SparkConf();
                conf.setMaster("local[" + numOfCores + "]");
                conf.setAppName("WL");                  
                conf.set("spark.serializer",
                        "org.apache.spark.serializer.KryoSerializer");
                    javaSparkContext = new JavaSparkContext(conf);                  
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        return javaSparkContext;
    }   

    public void closeSparkContext(){
        javaSparkContext.stop(); 
        javaSparkContext.close();
        javaSparkContext= null;     
    }   
}

Then, in other class I read the mongodb collection:

SparkLauncher sc = new SparkLauncher(); 
JavaSparkContext javaSparkContext = sc.getSparkContext();

try {
        interactions = javaSparkContext.newAPIHadoopRDD(mongodbConfig,
        MongoInputFormat.class, Object.class, BSONObject.class);
    }
 catch (Exception e) {
        System.out.print(e.getMessage());
    }

This code creates a lot of threads reading the collection's splits. After I close the JavaSparkContext object :

javaSparkContext.close();
sc.closeSparkContext();
System.gc();

All threads are still alive and the memory is not released. It causes kind of memory leak and thread leak. Is this because of newAPIHadoopRDD method? Is there any way to get rid of these threads ?

Here is a snapshot of part of threads still are alive: enter image description here

Here is the memory usage of the program using jconsole: enter image description here

And finally the leak suspect in eclipse memory analyzer: enter image description here

Morteza Mashayekhi
  • 934
  • 11
  • 23
  • How do you know about *"All threads are still alive and the memory is not released."*? How do you measure/trace it? Few screenshots could help to see what you see and refer to. – Jacek Laskowski Dec 24 '15 at 11:25

3 Answers3

1

There seems to be a connection leakage problem with mongo-hadoop. I could experience the same problem after running a sample code that was streaming data from mongodb.

It seems to be fixed with most recent version 1.4.2, and it works fine for me in sample code. Change your maven dependency to:

<dependency>
    <groupId>org.mongodb.mongo-hadoop</groupId>
    <artifactId>mongo-hadoop-core</artifactId>
    <version>1.4.2</version>
</dependency>
zoran jeremic
  • 2,046
  • 4
  • 21
  • 47
0

I don't use MongoDB with Spark and very likely can't fully answer your question, but I've got few comments that may lead to a solution.

  • The line System.setProperty("spark.executor.memory", "2g") is not in effect for your Spark environment since you use local mode where the amount of memory for the one and only one executor is the amount of memory you assign to the application at startup (and can't ever be changed).

You'd be better off removing the line or switching to other Spark deployment environments, e.g. Standalone, YARN or Mesos.

  • The same comment applies to conf.set("spark.executor.instances", "10") since there can only be up to runtime.availableProcessors() or numOfCores threads to launch tasks. You're using local mode after all where there is only a single JVM with up to Runtime.getRuntime().availableProcessors() threads for task execution.

The threads that are used by Spark will eventually be released. The thread pool they belong to has been shut down as part of SparkContext.stop (don't see it being called in your example, but since I'm using Scala API there can be differences).

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
0

I did another experiment. This time I just wrote a simple code to read a Mongo collection using the same method in a loop with 15 iterations . At the end of the loop I also call System.gc() just in case.

Here is the jconsole output for this code which shows accumulating threads during the run. enter image description here

I also implemented the code using MongoDB api and without newAPIHadoopRDD on another collection(see below chart): enter image description here After a while the memory usage becomes fixed. But see memory usage and threads when I used newAPIHadoopRDD to read the same collection: enter image description here

Morteza Mashayekhi
  • 934
  • 11
  • 23