I am launching a cross-validation study on 50 containers of a yarn cluster. The data are bout 600,000 lines.
The job works well most of the time but uses a lot of RAM and CPU resources on the driver server of the cluster (the machine where the job is started): 3 out 4 CPU cores. However, I cannot use that much resources as this server is used by several people.
My questions are:
- Why is my code using so much resources on the driver?
- How could I modify it so that it consumes fewer resources on the driver and more on cluster nodes?
I do not know much spark but my bet for the 1st question is that I should use more RDDS than Arrays and ParArrays but I cannot figure how...
Here is my code:
val sc: SparkContext = new SparkContext()
.setMaster( "yarn-client" )
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.amadeus.ssp.tools.SSPKryoRegistrator")
val data = sc.textFile("...").map(some pre-treatment...)
// Parameters
val numModels = Array(5)
val trainingRatioMajMin = 0.7
// Tree Ensemble
val numTrees = Array(50)
val maxDepth = Array(30)
val maxBins = Array(100)
// RF
val featureSubsetStrategy = Array("sqrt")
val subsamplingRate = Array(1.0)
// Class for model
class Model(model: Array[RandomForestModel]) {
def predict(data:RDD[Vector]) : RDD[Double] = {
data.map(p => predict(p))
}
def predictWithLabels(data:RDD[LabeledPoint]) : RDD[(Double, Double)] = {
data.map(p => (p.label, predict(p.features)))
}
def predict(point:Vector): Double = {
model.map(m => m.predict(point)).sum / model.length
}
}
val CV_params: Array[Array[Any]] = {
for (a <- numTrees; b <- maxDepth; c <- maxBins; d <- featureSubsetStrategy;
e <- subsamplingRate; f <- numModels) yield Array(a, b, c, d, e, f)
}
// Sampling dataset
def sampling(dataset:RDD[LabeledPoint], fraction :Double): (Array[RDD[LabeledPoint]], RDD[LabeledPoint]) = {
logInfo("Begin Sampling")
val dataset_maj = dataset filter(_.label == 0.0)
val dataset_min = dataset filter(_.label == 1.0)
dataset_maj.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
val data = ((1 to params(5).asInstanceOf[String]).map { sample =>
dataset_maj.sample(false, fraction)
}.toArray, dataset_min)
dataset_maj.unpersist()
logInfo("End Sampling")
data
}
// Train the model
def classificationModel(params:Array[Any], training:RDD[LabeledPoint]) : EasyEnsemble_Classifier_Model = {
val (samples, data_min) = sampling(data, fraction)
data_min.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
val models = samples.par.map { sample =>
val strategy = new Strategy(Algo.Classification, org.apache.spark.mllib.tree.impurity.Gini, params(1).asInstanceOf[Int],
numClasses, params(2).asInstanceOf[Int], QuantileStrategy.Sort, categoricalFeaturesInfo, 1, 0.0, 256, params(4).asInstanceOf[Double], false, 10)
val model = RandomForest.trainClassifier(sample ++ data_min, strategy, params(0).asInstanceOf[Int], params(3).asInstanceOf[String], 0)
logInfo(s"RF - totalNumNodes: ${model.totalNumNodes} - numTrees: ${model.numTrees}")
model
}.toArray
data_min.unpersist()
logInfo(s"RF: End RF training\n")
new Model(models)
}
///// Cross-validation
val cv_data:Array[(RDD[LabeledPoint], RDD[LabeledPoint])] = MLUtils.kFold(data, numFolds, 0)
logInfo("Begin cross-validation")
val result : Array[(Double, Double)] = cv_data.par.map{case (training, validation) =>
training.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
validation.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
val res :ParArray[(Double, Double)] = CV_params.par.zipWithIndex.map { case (p,i) =>
// Training classifier
val model = classificationModel(p, training)
// Prediction
val labelAndPreds = model.predictWithLabels(validation)
// Metrics computation
val bcm = new BinaryClassificationMetrics(labelAndPreds)
logInfo("ROC: %s".format(bcm.roc().collect().map(_.toString).reduce(_ + " - " + _)))
logInfo("PR: %s".format(bcm.pr().collect().map(_.toString).reduce(_ + " - " + _)))
logInfo("auPR: %s".format(bcm.areaUnderPR().toString))
logInfo("fMeasure: %s".format(bcm.fMeasureByThreshold().collect().map(_.toString).reduce(_ + " - " + _)))
(bcm.areaUnderROC() / numFolds, bcm.areaUnderPR() / numFolds)
}
training.unpersist()
validation.unpersist()
res
}.reduce((s1,s2) => s1.zip(s2).map(t => (t._1._1 + t._2._1, t._1._2 + t._2._2))).toArray
val cv_roc = result.map(_._1)
val cv_pr = result.map(_._2)
logInfo("areaUnderROC: %s".format(cv_roc.map(_.toString).reduce( _ + " - " + _)))
logInfo("areaUnderPR: %s".format(cv_pr.map(_.toString).reduce( _ + " - " + _)))
// Extract best params
val which_max = (metric match {
case "ROC" => cv_roc
case "PR" => cv_pr
case _ =>
logWarning("Metrics set to default one: PR")
cv_pr
}).zipWithIndex.maxBy(_._1)._2
best_values_array = CV_params(which_max)
CV_areaUnderROC = cv_roc
CV_areaUnderPR = cv_pr
EDIT
I launch it with
spark-submit \
--properties-file spark.properties \
--class theClass \
--master yarn-client \
--num-executors 50 \
job.jar
spark.properties
is
spark.rdd.compress true
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Djava.library.path=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
org.apache.spark.io.SnappyCompressionCodec
spark.yarn.maxAppAttempts 1
yarn.log-aggregation-enable true
spark.executor.memory 5g
spark.yarn.executor.memoryOverhead 1024
spark.cassandra.connection.host 172.16.110.94
spark.cassandra.connection.timeout_ms 60000
spark.cassandra.connection.compression SNAPPY