I have notice that when i increase the number of slaves on my cluster the performance decrease. The machine i use have 2x10cores and 18Go of memory. The first have 5 slaves and the second 7. I have the same result with AWS with machine of 8cores 30Go memory with on one hand 4 slaves and 8 slaves on the other hand.
I increase the level of parallelism accordingly with the number of cores but nothing changed.
Moreover i don't really understand the spark.driver.cores property, should i set it with the maximal number of core i can ?
Conf 5 slaves
.set("spark.driver.cores","10")
.set("spark.driver.memory","15g")
//.set("spark.akka.threads","16")
.set("spark.executor.memory","3g")
.set("spark.executor.cores","5")
.set("spark.executor.instances","20")
.set("spark.default.parallelism","100")
Conf 7 slaves
.set("spark.driver.cores","10")
.set("spark.driver.memory","15g")
.set("spark.executor.memory","3g")
.set("spark.executor.cores","5")
.set("spark.executor.instances","28")
.set("spark.default.parallelism","140")
Code
import spire.implicits._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.rdd.RDD._
case class Data_Object(private val id:String, private var vector:Vector) extends Serializable {
def get_id = this.id
def get_vector = this.vector
}
def bary(tab1:Array[(Vector,Double)],k:Int) : Vector = {
var tab2 = tab1.map(_._1.toArray)
var bary1 = tab2.reduce(_+_)
bary1 = bary1.map(x=>x/k)
val bary2 = Vectors.dense(bary1)
bary2
}
val data = sc.textFile("/ds10.csv")
var parsedData = data.map(x => x.split(',')).map(y => new Data_Object(y(0),Vectors.dense(y.tail.map(_.toDouble))))
var k = 60
var numPart2 = 10
var rdd_temp = parsedData
parsedData.cache
//parsedData.count = 10000
for( ind <- 1 to maxIterForYstar ) {
var rdd_distance = rdd_temp.cartesian(parsedData).flatMap{ case (x,y) =>
Some((x.get_id,(y.get_vector,-Vectors.sqdist(x.get_vector,y.get_vector))))
}//.repartition(1000)
var rdd_knn_bykey = rdd_distance.topByKey(k)(Ordering[(Double)].on(x=>x._2)).coalesce(numPart2)
var rdd_knn_bary = rdd_knn_bykey.map(x=>(x._1,bary(x._2,k)))
rdd_temp = rdd_knn_bary.map(x=> new Data_Object(x._1,x._2))
}