2

I'm trying to write sentiment analysis program based on Spark. To do this I'm using word2vec and KMeans clustering. From word2Vec I've got 20k word/vectors collection in 100 dimension space and now I'm trying to clusterize this vectors space. When I run KMeans with default parallel implementation the algorithm worked 3 hours! But with random initialization strategy it was like 8 minutes. What am I doing wrong? I have mac book pro machine with 4 kernels processor and 16 GB of RAM.

K ~= 4000 maxInteration was 20

var vectors: Iterable[org.apache.spark.mllib.linalg.Vector] =
      model.getVectors.map(entry => new VectorWithLabel(entry._1, entry._2.map(_.toDouble)))
    val data = sc.parallelize(vectors.toIndexedSeq).persist(StorageLevel.MEMORY_ONLY_2)
    log.info("Clustering data size {}",data.count())
    log.info("==================Train process started==================");
    val clusterSize = modelSize/5

    val kmeans = new KMeans()
    kmeans.setInitializationMode(KMeans.K_MEANS_PARALLEL)
    kmeans.setK(clusterSize)
    kmeans.setRuns(1)
    kmeans.setMaxIterations(50)
    kmeans.setEpsilon(1e-4)

    time = System.currentTimeMillis()
    val clusterModel: KMeansModel = kmeans.run(data)

And spark context initialization is here:

val conf = new SparkConf()
      .setAppName("SparkPreProcessor")
      .setMaster("local[4]")
      .set("spark.default.parallelism", "8")
      .set("spark.executor.memory", "1g")
    val sc = SparkContext.getOrCreate(conf)

Also few updates about running this program. I'm running it inside Intelij IDEA. I don't have real Spark cluster. But I thought that your personal machine can be Spark cluster

I saw that the program hangs inside this loop from Spark code LocalKMeans.scala:

// Initialize centers by sampling using the k-means++ procedure.
    centers(0) = pickWeighted(rand, points, weights).toDense
    for (i <- 1 until k) {
      // Pick the next center with a probability proportional to cost under current centers
      val curCenters = centers.view.take(i)
      val sum = points.view.zip(weights).map { case (p, w) =>
        w * KMeans.pointCost(curCenters, p)
      }.sum
      val r = rand.nextDouble() * sum
      var cumulativeScore = 0.0
      var j = 0
      while (j < points.length && cumulativeScore < r) {
        cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
        j += 1
      }
      if (j == 0) {
        logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
          s" Using duplicate point for center k = $i.")
        centers(i) = points(0).toDense
      } else {
        centers(i) = points(j - 1).toDense
      }
    }
zero323
  • 322,348
  • 103
  • 959
  • 935
Igor Masternoy
  • 436
  • 1
  • 11
  • 36
  • What is your question ? – eliasah Jan 05 '16 at 18:04
  • Why it runs so slow in Parallel mode? – Igor Masternoy Jan 05 '16 at 21:38
  • You'll need to check the size of the data, the DAG scheduler schema in the Spark UI, monitor your cluster like said in the answer below, etc. it may depend on several facts. Your question is not salvageable as is. – eliasah Jan 05 '16 at 21:41
  • Spark UI says that all job done... It does some transformation before go to kMeansPlusPlus lagorithm. I think that the problem is somewhere in the code i've posted, because it takes 80% of program – Igor Masternoy Jan 05 '16 at 22:07
  • Where do you define k for the loop? And how much it is? – eliasah Jan 05 '16 at 22:08
  • Oh I'm so sorry. k equal 20k / 5 ~= 4000 – Igor Masternoy Jan 05 '16 at 22:30
  • 1
    Well this is the problem, a loop is not parallelized. So complexity-wise consider that a kmeans is in O(nnz k + d k) if k and d are fixed times the 4000 iterations, I think it can give you some clues... – eliasah Jan 06 '16 at 08:22

2 Answers2

1

Initialisation using KMeans.K_MEANS_PARALLEL is more complicated then random. However, it shouldn't make such a big difference. I would recommend to investigate, whether it is the parallel algorithm which takes to much time (it should actually be more efficient then KMeans itself).

For information on profiling see: http://spark.apache.org/docs/latest/monitoring.html

If it is not the initialisation which takes up the time there is something seriously wrong. However, using random initialisation shouldn't be any worse for the final result (just less efficient!).

Actually when you use KMeans.K_MEANS_PARALLEL to initialise you should get reasonable results with 0 iterations. If this is not the case there might be some regularities in the distribution of the data which send KMeans offtrack. Hence, if you haven't distributed your data randomly you could also change this. However, such an impact would surprise me give a fixed number of iterations.

CAFEBABE
  • 3,983
  • 1
  • 19
  • 38
  • Thank you for the answer. I've read Matei papers about the Spark, but I'm still not familiar with Spark. I'm launching this script from the IDE (InteliJ IDEA) and can debug it. I'm not sure that this is the right way to use Spark, but I didn't see any reasons to not launch Spark program inside IDE. I'm using macbook pro machine that can gurantee 4 process run in parallel (it has 4 kernels!) , also it has 16 GB RAM that enough for 20k words :) So I debugged my application and found that it hangs in LocalKMeans.scala in the loop from 49 - 54 line. – Igor Masternoy Jan 05 '16 at 21:37
1

I've run spark on AWS with 3 slaves (c3.xlarge) and the result is the same - problem is that parallel KMeans initialize algo in N parallel runs, but it's still extremely slow for small amount of data, my solution is to contionue using Random initialization. Data size approximately: 4k clusters for 21k 100-dim vectors.

Igor Masternoy
  • 436
  • 1
  • 11
  • 36