2

I am implementing k-means and I want to create the new centroids. But the mapping leaves one element out! However, when K is of a smaller value, like 15, it will work fine.

Based on that code I have:

val K = 25 // number of clusters
val data = sc.textFile("dense.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val count = data.count()
println("Number of records " + count)

var centroids = data.takeSample(false, K, 42).map(x => x._2)
do {
  var closest = data.map(p => (closestPoint(p._2, centroids), p._2))
  var pointsGroup = closest.groupByKey()
  println(pointsGroup)
  pointsGroup.foreach { println }
  var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
  //var newCentroids = pointsGroup.mapValues(ps => average(ps)).collectAsMap() this will produce an error
  println(centroids.size)
  println(newCentroids.size)
  for (i <- 0 until K) {
    tempDist += centroids(i).squaredDist(newCentroids(i))
  }
  ..

and in the for loop, I will get the error that it won't find the element (which is not always the same and it depends on K:

java.util.NoSuchElementException: key not found: 2

Output before the error comes up:

Number of records 27776
ShuffledRDD[5] at groupByKey at kmeans.scala:72
25
24            <- IT SHOULD BE 25

What is the problem?


>>> println(newCentroids)
Map(23 -> (-0.0050852959701492536, 0.005512245104477607, -0.004460964477611937), 17 -> (-0.005459583045685268, 0.0029015278781725795, -8.451635532994901E-4), 8 -> (-4.691649213483123E-4, 0.0025375451685393366, 0.0063490755505617585), 11 -> (0.30361112034069937, -0.0017342255382385204, -0.005751167731061906), 20 -> (-5.839587918939964E-4, -0.0038189763756820145, -0.007067070459859708), 5 -> (-0.3787612396704685, -0.005814121628643806, -0.0014961713117870657), 14 -> (0.0024755681263616547, 0.0015191503267973836, 0.003411769193899781), 13 -> (-0.002657690932944597, 0.0077671050923225635, -0.0034652379980563263), 4 -> (-0.006963114731610361, 1.1751361829025871E-4, -0.7481135105367823), 22 -> (0.015318187079953534, -1.2929035958285013, -0.0044176372190034684), 7 -> (-0.002321059060773483, -0.006316359116022083, 0.006164669723756913), 16 -> (0.005341800955165691, -0.0017540737037037035, 0.004066574093567247), 1 -> (0.0024547379611650484, 0.0056298656504855955, 0.002504618082524296), 10 -> (3.421068671121009E-4, 0.0045169004751299275, 5.696239049740164E-4), 19 -> (-0.005453716071428539, -0.001450277556818192, 0.003860007248376626), 9 -> (-0.0032921685273631807, 1.8477108457711313E-4, -0.003070412228855717), 18 -> (-0.0026803160958904053, 0.00913904078767124, -0.0023528013698630146), 3 -> (0.005750011594202901, -0.003607098309178754, -0.003615918896940412), 21 -> (0.0024925166025641056, -0.0037607353461538507, -2.1588444871794858E-4), 12 -> (-7.920202960526356E-4, 0.5390774232894769, -4.928884539473694E-4), 15 -> (-0.0018608492323232324, -0.006973787272727284, -0.0027266663434343404), 24 -> (6.151173211963486E-4, 7.081812613784045E-4, 5.612962808842611E-4), 6 -> (0.005323933953732931, 0.0024014750473186123, -2.969338590956889E-4), 0 -> (-0.0015991676750160377, -0.003001317289659613, 0.5384176139563245))

Question with relevant error: spark scala throws java.util.NoSuchElementException: key not found: 0 exception


EDIT:

After the observation of zero323 that two centroids were the same, I changed the code so that all the centroids are unique. However, the behaviour remains the same. For that reason, I suspect that closestPoint() may return the same index for two centroids. Here is the function:

  def closestPoint(p: Vector, centers: Array[Vector]): Int = {
    var index = 0
    var bestIndex = 0
    var closest = Double.PositiveInfinity
    for (i <- 0 until centers.length) {
      val tempDist = p.squaredDist(centers(i))
      if (tempDist < closest) {
        closest = tempDist
        bestIndex = i
      }
    }
    return bestIndex
  }

How to get away with this? I am running the code like I describe in Spark cluster.

Community
  • 1
  • 1
gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • Thanks for the upvote, I really appreciate it 'cause I am *really* stuck here! – gsamaras Feb 13 '16 at 00:43
  • Quick check `centroids.map(_.toString).distinct.size` – zero323 Feb 13 '16 at 01:43
  • @zero323 24! Now we checked how many unique centroids we have? I can confirm that there two centroids that are the same, while the others seem unique. – gsamaras Feb 13 '16 at 01:52
  • Hence the answer (I guess) :) – zero323 Feb 13 '16 at 02:09
  • I am afraid not @zero323 :/ I modified the code to read the centroids from a file (where I have manually placed 25 unique centroids). The quick check you said gives 25, while `newCentroids.size` is still 24! :/ *Please* check my update. – gsamaras Feb 13 '16 at 02:23
  • Do you run this in a new session? If not rdd is most likely cached on shuffle. Also please don't use `vars`. And `groupByKey`. – zero323 Feb 13 '16 at 02:23
  • I am not sure what is a clear session. I updated the question with the way I run it. It boils down to this: `bin/spark-submit --class "WikipediaKMeans" --master spark://gsamaras:7077 target/scala-2.10/wikipediakmeans-project_2.10-1.0.jar` @zero323 – gsamaras Feb 13 '16 at 02:25
  • Do you still sample centroids from `data`? – zero323 Feb 13 '16 at 02:29
  • I actually sample the points from the file I said @zero323, but that's not the point, *I think*, because even if I set that to kick off nicely for the first step, as k-means advances, there may be again identical centroids. So what we really need here is to construct 25 `newCentroids`, without caring for identical centroids. We could for example, leave that code the same and if the centroids are 24, then we could append a random point from the dataset to `newCentroids`..However I am so new in Scala that I have a hard time doing that. – gsamaras Feb 13 '16 at 02:33
  • Well, your approach can fail even if there are distinct points but there are no points assigned to a given cluster. Standard k-means update centers. If there are no points assigned to a given cluster centroid should stay exactly where it is. – zero323 Feb 13 '16 at 02:44
  • @zero323 the edit in your last comment was the key! I had forgotten to change that. So please go ahead and make an answer, answering the initial question, i.e. that because of the duplicates centroids we were experiencing this kind of behaviour. ;) – gsamaras Feb 13 '16 at 02:47
  • Maybe tomorrow if don't forget. Have fun :) – zero323 Feb 13 '16 at 02:54

1 Answers1

2

It can happen in the "E-step" (the assignment of points to cluster-indices is analogous to the E-step of the EM algorithm) that one of your indices will not be assigned any points. If this happens then you need to have a way of associating that index with some point, otherwise you're going to wind up with fewer clusters after the "M-step" (the assignment of centroids to the indices is analogous to the M-step of the EM algorithm.) Something like this should probably work:

val newCentroids = {
  val temp = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
  val nMissing = K - temp.size 
  val sample = data.takeSample(false, nMissing, seed)
  var c = -1
  (for (i <- 0 until K) yield {
   val point = temp.getOrElse(i, {c += 1; sample(c) })
   (i, point)
  }).toMap      
}   

Just substitute that code for the line you are currently using to compute newCentroids.

There are other ways of dealing with this issue and the approach above is probably not the best (is it a good idea to be calling takeSample multiple times, once for each iteration of the the k-means algorithm? what if data contains a lot of repeated values?, etc.), but it is a simple starting point.

By the way, you might want to think about how you can replace the groupByKey with a reduceByKey.

Note: For the curious, here's a reference describing the similarities between the EM-algorithm and the k-means algorithm: http://papers.nips.cc/paper/989-convergence-properties-of-the-k-means-algorithms.pdf.

Jason Scott Lenderman
  • 1,908
  • 11
  • 14
  • Sorry, I just noticed that @zero323 answered the question to the OP's satisfaction in the comments. I'll keep my answer here for now since I think it is still useful. – Jason Scott Lenderman Feb 13 '16 at 04:19
  • Good decision to keep the answer. I would suggest you to update your answer with a link or a small description of the EM algorithm -> I do not know about it, since I am new ... – gsamaras Feb 13 '16 at 18:00
  • The error: `type mismatch; found : Serializable, required: org.apache.spark.util.Vector, tempDist += centroids(i).squaredDist(newCentroids(i))` and the error is in `newCentroids(i)` – gsamaras Feb 13 '16 at 19:19
  • However, I am more interested in "By the way, you might want to think about how you can replace the groupByKey with a reduceByKey.". Maybe I should post a new question? – gsamaras Feb 13 '16 at 19:21
  • I don't know what's going on with the error. The type of `newCentroids` is, I believe, `Map[Int, Vector]`; so the type of `newCentroids(i)` is `Vector`, which appears to be what you need there. – Jason Scott Lenderman Feb 13 '16 at 19:36
  • By the way, randomly reassigning empty clusters to random points (in the way I suggested in the computation of `newCentroids`) is not going to work well with this stopping condition (i.e. what is being accumulated in `tempDist`.) You need to take this into account and modify your stopping condition appropriately. I generally base my stopping condition on the change in the actual error (rather than the change in the model parameters) so this issue wouldn't arise. – Jason Scott Lenderman Feb 13 '16 at 19:43
  • Yes, if you post a new question I can try to answer that issue (i.e. `groupByKey` versus `reduceByKey` or `aggregateByKey`) there. – Jason Scott Lenderman Feb 13 '16 at 19:47
  • Jasoni yes, I agree, so I am keeping the implementation where I read the centroids from the file and everything works fine. Now, let's move to the interesting question: http://stackoverflow.com/questions/35388277/replace-groupbykey-with-reducebykey – gsamaras Feb 14 '16 at 04:13