1

I have notice that when i increase the number of slaves on my cluster the performance decrease. The machine i use have 2x10cores and 18Go of memory. The first have 5 slaves and the second 7. I have the same result with AWS with machine of 8cores 30Go memory with on one hand 4 slaves and 8 slaves on the other hand.

I increase the level of parallelism accordingly with the number of cores but nothing changed.

Moreover i don't really understand the spark.driver.cores property, should i set it with the maximal number of core i can ?

Conf 5 slaves
                    .set("spark.driver.cores","10")
                    .set("spark.driver.memory","15g")
                    //.set("spark.akka.threads","16")
                    .set("spark.executor.memory","3g")
                    .set("spark.executor.cores","5")
                    .set("spark.executor.instances","20")
                    .set("spark.default.parallelism","100")

Conf 7 slaves

              .set("spark.driver.cores","10")
              .set("spark.driver.memory","15g")
              .set("spark.executor.memory","3g")
              .set("spark.executor.cores","5")
              .set("spark.executor.instances","28")
              .set("spark.default.parallelism","140")

Code

import spire.implicits._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ 
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.rdd.RDD._


case class Data_Object(private val id:String, private var vector:Vector) extends Serializable {
    def get_id = this.id
    def get_vector = this.vector
}

def bary(tab1:Array[(Vector,Double)],k:Int) : Vector = {

    var tab2 = tab1.map(_._1.toArray)

    var bary1 = tab2.reduce(_+_)
    bary1 = bary1.map(x=>x/k)

    val bary2 = Vectors.dense(bary1)
    bary2
}


    val data = sc.textFile("/ds10.csv")
    var parsedData = data.map(x => x.split(',')).map(y => new Data_Object(y(0),Vectors.dense(y.tail.map(_.toDouble))))

    var k = 60
    var numPart2 = 10
    var rdd_temp = parsedData
    parsedData.cache
    //parsedData.count = 10000
    for( ind <- 1 to maxIterForYstar  ) {

        var rdd_distance = rdd_temp.cartesian(parsedData).flatMap{ case (x,y) =>
                    Some((x.get_id,(y.get_vector,-Vectors.sqdist(x.get_vector,y.get_vector))))
                }//.repartition(1000)

        var rdd_knn_bykey = rdd_distance.topByKey(k)(Ordering[(Double)].on(x=>x._2)).coalesce(numPart2)

        var rdd_knn_bary = rdd_knn_bykey.map(x=>(x._1,bary(x._2,k)))

        rdd_temp = rdd_knn_bary.map(x=> new Data_Object(x._1,x._2))
    }
KyBe
  • 842
  • 1
  • 14
  • 33
  • " var rdd_knn_bary = rdd_knn_bykey.map(x=>(x._1,bary(x._2,k)))". Where is "k" defined? – The Archetypal Paul Jul 22 '15 at 13:35
  • And "numPart2". Better, could you provide a minimal complete verifiable example? http://stackoverflow.com/help/mcve – The Archetypal Paul Jul 22 '15 at 13:42
  • Sorry, i completed missing informations. – KyBe Jul 22 '15 at 13:48
  • I get a parameter type mismatch on`var bary1 = tab2.reduce(_+_)` and a `reassignment to val` on ` parsedData.count = 10000` (assuming `parsedData` is an `RDD`). So this isn't the code you're using, it seems, Can we get that MCVE? – The Archetypal Paul Jul 22 '15 at 13:55
  • How do you know the program scales at all ? Please check runtimes for 1,2,3 slaves. Your are not using HDFS, perhaps the whole exercise could be disk bound. Adding more slaves will only improve if the job is CPU bound and parallelizable. Your program doesn't make use of parallelize, see https://spark.apache.org/docs/latest/programming-guide.html#parallelized-collections – user568109 Jul 22 '15 at 16:57
  • I don't know if my program scales. I check for 1,2,3 slaves. It score the same with one et two and performance decrease with 3. I put my file in HDFS. sc.textFile return an RDD so it parallelize my data, am i wrong, if it's the case how can i do ? – KyBe Jul 22 '15 at 18:16
  • Yes spark should parallelize it. If you are reading a local file, it must be present on all machines. So check that. Also how many partitions does the RDD have? If your file is small parallelizing it will incur overhead and performance will decrease. Since it doesn't scale at all, make sure it has enough partitions to take advantage of the slaves. – user568109 Jul 23 '15 at 07:06
  • I try to set default parallelism to number of cores until number of core x16 by doubling at each step, nothing changed. I try to modify textFile("file",numPart) too, i've got the same issue. It is the dataset size, i use dataset between 10k and 320k lines but the cartesian put the number of element to square thus i have an RDD big enough i think. – KyBe Jul 23 '15 at 13:03

0 Answers0