1

I am launching a cross-validation study on 50 containers of a yarn cluster. The data are bout 600,000 lines.

The job works well most of the time but uses a lot of RAM and CPU resources on the driver server of the cluster (the machine where the job is started): 3 out 4 CPU cores. However, I cannot use that much resources as this server is used by several people.

My questions are:

  1. Why is my code using so much resources on the driver?
  2. How could I modify it so that it consumes fewer resources on the driver and more on cluster nodes?

I do not know much spark but my bet for the 1st question is that I should use more RDDS than Arrays and ParArrays but I cannot figure how...

Here is my code:

val sc: SparkContext = new SparkContext()
  .setMaster( "yarn-client" )
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.amadeus.ssp.tools.SSPKryoRegistrator")

val data = sc.textFile("...").map(some pre-treatment...)

// Parameters 
  val numModels = Array(5)
  val trainingRatioMajMin = 0.7
  // Tree Ensemble
  val numTrees = Array(50)
  val maxDepth = Array(30)
  val maxBins = Array(100)
  // RF
  val featureSubsetStrategy = Array("sqrt")
  val subsamplingRate = Array(1.0)

// Class for model
class Model(model: Array[RandomForestModel]) {
  def predict(data:RDD[Vector]) : RDD[Double] = {
    data.map(p => predict(p))
  }
  def predictWithLabels(data:RDD[LabeledPoint]) : RDD[(Double, Double)] = {
    data.map(p => (p.label, predict(p.features)))
  }
  def predict(point:Vector): Double = {
    model.map(m => m.predict(point)).sum / model.length
  }
}

 val CV_params: Array[Array[Any]] = {
    for (a <- numTrees; b <- maxDepth; c <- maxBins; d <- featureSubsetStrategy;
        e <- subsamplingRate; f <- numModels) yield Array(a, b, c, d, e, f)
  }

 // Sampling dataset 
 def sampling(dataset:RDD[LabeledPoint], fraction :Double): (Array[RDD[LabeledPoint]], RDD[LabeledPoint]) = {
    logInfo("Begin Sampling")
    val dataset_maj = dataset filter(_.label == 0.0)
    val dataset_min = dataset filter(_.label == 1.0)
    dataset_maj.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

    val data =  ((1 to params(5).asInstanceOf[String]).map { sample =>
      dataset_maj.sample(false, fraction)
    }.toArray, dataset_min)
    dataset_maj.unpersist()
    logInfo("End Sampling")
    data
  }

// Train the model
  def classificationModel(params:Array[Any], training:RDD[LabeledPoint]) : EasyEnsemble_Classifier_Model = {
    val (samples, data_min) = sampling(data, fraction)
    data_min.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
    val models = samples.par.map { sample =>
      val strategy = new Strategy(Algo.Classification, org.apache.spark.mllib.tree.impurity.Gini, params(1).asInstanceOf[Int],
      numClasses, params(2).asInstanceOf[Int],  QuantileStrategy.Sort, categoricalFeaturesInfo, 1, 0.0, 256, params(4).asInstanceOf[Double], false, 10)
      val model = RandomForest.trainClassifier(sample ++ data_min, strategy, params(0).asInstanceOf[Int], params(3).asInstanceOf[String], 0)
      logInfo(s"RF - totalNumNodes: ${model.totalNumNodes} - numTrees: ${model.numTrees}")
      model
    }.toArray
    data_min.unpersist()
    logInfo(s"RF: End RF training\n")
    new Model(models)
  }


 ///// Cross-validation
val cv_data:Array[(RDD[LabeledPoint], RDD[LabeledPoint])] = MLUtils.kFold(data, numFolds, 0)

logInfo("Begin cross-validation")
val result : Array[(Double, Double)] = cv_data.par.map{case (training, validation) =>
  training.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
  validation.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

  val res :ParArray[(Double, Double)] = CV_params.par.zipWithIndex.map { case (p,i) =>
    // Training classifier
    val model = classificationModel(p, training)
    // Prediction
    val labelAndPreds = model.predictWithLabels(validation)
    // Metrics computation
    val bcm = new BinaryClassificationMetrics(labelAndPreds)
    logInfo("ROC: %s".format(bcm.roc().collect().map(_.toString).reduce(_ + " - " + _)))
    logInfo("PR: %s".format(bcm.pr().collect().map(_.toString).reduce(_ + " - " + _)))
    logInfo("auPR: %s".format(bcm.areaUnderPR().toString))
    logInfo("fMeasure: %s".format(bcm.fMeasureByThreshold().collect().map(_.toString).reduce(_ + " - " + _)))
    (bcm.areaUnderROC() / numFolds, bcm.areaUnderPR() / numFolds)
  }

  training.unpersist()
  validation.unpersist()
  res
}.reduce((s1,s2) => s1.zip(s2).map(t => (t._1._1 + t._2._1, t._1._2 + t._2._2))).toArray

val cv_roc = result.map(_._1)
val cv_pr = result.map(_._2)

logInfo("areaUnderROC: %s".format(cv_roc.map(_.toString).reduce( _ + " - " + _)))
logInfo("areaUnderPR: %s".format(cv_pr.map(_.toString).reduce( _ + " - " + _)))

// Extract best params
val which_max = (metric match {
  case "ROC" => cv_roc
  case "PR" => cv_pr
  case _ =>
    logWarning("Metrics set to default one: PR")
    cv_pr
}).zipWithIndex.maxBy(_._1)._2

best_values_array = CV_params(which_max)
CV_areaUnderROC = cv_roc
CV_areaUnderPR = cv_pr

EDIT

I launch it with

spark-submit \
    --properties-file spark.properties \
    --class theClass \
    --master yarn-client \
    --num-executors 50 \
    job.jar 

spark.properties is

spark.rdd.compress               true
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Djava.library.path=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
org.apache.spark.io.SnappyCompressionCodec

spark.yarn.maxAppAttempts   1
yarn.log-aggregation-enable true

spark.executor.memory              5g
spark.yarn.executor.memoryOverhead 1024

spark.cassandra.connection.host            172.16.110.94
spark.cassandra.connection.timeout_ms      60000
spark.cassandra.connection.compression     SNAPPY
Pop
  • 12,135
  • 5
  • 55
  • 68

2 Answers2

0

When running your job, you can limit the available cores and memory your job will have access to on the driver:

$ spark-submit \
    --class foo.bar.Main \
    --conf spark.driver.memory=1g \
    --conf spark.driver.cores=1 \
    --conf spark.executor.memory=10g \
    --conf spark.executor.cores=6
    /path/to/job.jar <options>

You can also change the number of executors available, when running with Yarn.

See the Spark Configuration Options document for the rest.

[edit]

I've just noticed that I can't really see anywhere in your code where you're using Spark. I don't see any Spark context or anywhere where you're calling parallelize on your collections (arrays). Without doing this, they won't be distributed and processed in parallel. I'm quite new to Spark myself, but I don't see how your code uses Spark, unless this is just a small portion of it.

Steven Bakhtiari
  • 3,227
  • 2
  • 20
  • 24
  • As i said in my question, I cannot use that much ressources on my driver node. I am therefore searching for a solution to use less memory and cpu on the driver and more on the executors (by modifying my code for example). – Pop Jan 12 '16 at 08:16
  • Maybe there's a misunderstanding... I'm trying to point out that you can define the resources for the driver and the executors directly when invoking your job. There are also options to define the resources on your executors. How does this not answer your question? – Steven Bakhtiari Jan 12 '16 at 08:27
  • OK. Indeed, I had understood. However, I know that driver ressources can be limited within spark-submit call. In my case, decreasing spark.driver.memory leads to an outOfMemory error – Pop Jan 12 '16 at 10:10
  • I've just noticed that I can't really see anywhere in your code where you're using Spark. I don't see any Spark context or anywhere where you're calling parallelize on your collections (arrays). Without doing this, they won't be distributed and processed in parallel. I'm quite new to Spark myself, but I don't see how your code uses Spark, unless this is just a small portion of it. – Steven Bakhtiari Jan 12 '16 at 10:29
  • You're right: this is just a portion of it. I first read data from a file into a RDD. I'll edit that – Pop Jan 12 '16 at 10:37
  • Where you're working with data that isn't in RDD format, have you attempted to parallelize it, i.e. `val rdd = sc.parallelize()` ? Also, as Ravindra pointed out, calling `collect()` on RDDs will download the data to your driver, which you probably do not want to do. Better to persist your output. – Steven Bakhtiari Jan 12 '16 at 15:18
0

I will tell you why you are eating Driver resources with collect() API.

As per Apache documentation:

collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

If you have N number of partitions (in your case 50 containers) , data from all partitions will be collected at Driver.

If you have large set of data, collect() may cause OutOfMemory error at Driver program.

Have a look at some of useful questions on how to handle this scenario:

Spark: Best practice for retrieving big data from RDD to local machine

Spark application fine tuning from cloudera blog

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • OK. But I only collect `bcm.roc()`, `bcm.pr()` and `bcm.fMeasureByThreshold()` whose length is 3 – Pop Jan 12 '16 at 12:03