11

KMeans has several parameters for its training, with initialization mode defaulted to kmeans||. The problem is that it marches quickly (less than 10min) to the first 13 stages, but then hangs completely, without yielding an error!

Minimal Example which reproduces the issue (it will succeed if I use 1000 points or random initialization):

from pyspark.context import SparkContext

from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs


if __name__ == "__main__":
    sc = SparkContext(appName='kmeansMinimalExample')

    # same with 10000 points
    data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
    C = KMeans.train(data, 8192,  maxIterations=10)    

    sc.stop()

The job does nothing (it doesn't succeed, fail or progress..), as shown below. There are no active/failed tasks in the Executors tab. Stdout and Stderr Logs don't have anything particularly interesting:

enter image description here

If I use k=81, instead of 8192, it will succeed:

enter image description here

Notice that the two calls of takeSample(), should not be an issue, since there were called twice in the random initialization case.

So, what is happening? Is Spark's Kmeans unable to scale? Does anybody know? Can you reproduce?


If it was a memory issue, I would get warnings and errors, as I had been before.

Note: placeybordeaux's comments are based on the execution of the job in client mode, where the driver's configurations are invalidated, causing the exit code 143 and such (see edit history), not in cluster mode, where there is no error reported at all, the application just hangs.


From zero323: Why is Spark Mllib KMeans algorithm extremely slow? is related, but I think he witnesses some progress, while mine hangs, I did leave a comment...

enter image description here

Community
  • 1
  • 1
gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • 1
    exit code 143 indicates that the container was killed for exceeding the memory allocated for the job. How much RAM was given to each container for heap size? Can you increase this? – placeybordeaux Sep 01 '16 at 00:34
  • Micheal 1.6.2, updated! What do you mean by other classifiers? I care only for kmeans. @placeybordeaux indeed. I wonder, in the driver or in the executors? Or both? However, notice that these messages seem to come from stages that are **completed**, not *failed*. I will try however increasing that to see what happens...Please guys if you find this question interesting upvote, to gather more attention and resolve the mystery! :) – gsamaras Sep 01 '16 at 01:33
  • 1
    There might not be an easy answer, you have two major factors that contribute to how much memory is needed: the number of data points and the dimensionality of the data. Can you reduce either and get a coherent result? Alternatively increase the RAM available to each machine. You might also want to try reducing the number of cores on the executors. If you are getting failures due to GC overhead limit then doing more tasks with less memory won't help. – placeybordeaux Sep 01 '16 at 18:23
  • @placeybordeaux I cannot decrease the #poitns, but I did performed dimensionality reduction, and 64 dimensions already sound too low. So, which configuration exactly you advice me to try? I have the flags right there in my question.. :) – gsamaras Sep 01 '16 at 18:31
  • @Michael I did updated my question, check it out! – gsamaras Sep 01 '16 at 22:36
  • @placeybordeaux the exit code 143 was happening only in client mode. Now, with my minimal example, I don't see them, when submitting my job. So, this must not be the issue.. :/ – gsamaras Sep 01 '16 at 23:20
  • What dimensionality reduction did you use (PCA,SVD or something else) – mobcdi Sep 02 '16 at 04:54
  • @Michael PCA, but that shouldn't matter. As you see I have updated the question with a super minimal example that reproduces the behavior, indicating that something may be wrong with Spark's KMeans...I mean in the minimal example, I create fresh data (so no PCA here). You see the question wants to resolve the mystery with Spark's KMeans, because I had heard people complaining about it 1 year before, and they would just not use it, but I had to know what is happening, and submit a bug if others can reproduce it too! Can you? You see, if it was just for my project, I would use random initializat – gsamaras Sep 02 '16 at 05:11
  • I'm not experienced in pyspark or spark to be of use to you but from a quick read of SO if you want to debug you might have to switch to Java. – mobcdi Sep 02 '16 at 05:25
  • http://stackoverflow.com/a/33328543/4700378 – mobcdi Sep 02 '16 at 05:25
  • I am unable to do so @Michael, but thanks for the tip. You see I am interested in Python only at the moment. You helped already by upvoting! :D – gsamaras Sep 02 '16 at 05:27
  • Related to [Why is Spark Mllib KMeans algorithm extremely slow?](http://stackoverflow.com/q/35512139/1560062) – zero323 Sep 02 '16 at 16:49
  • Correct @zero323, by last update confirms that reducing the number of iterations won't help, when `k` is 8192. However, that guy says the job is just slow, not that it *hangs* as in my case. I mean there is **nothing happening** in my job, no active tasks/stages/executors. Do you think it's the same issue? – gsamaras Sep 02 '16 at 16:52
  • Either that or something closely related but it is just a hunch. – zero323 Sep 02 '16 at 17:09
  • @zero323 check my update. Since I was thinking about `k` as well, and you did too, that says a lot! Can you please run my minimal example in your system, to see if you can reproduce the issue? :) – gsamaras Sep 02 '16 at 17:22
  • I can only test it on my workstation and although it is sloooow it seems to proceed. – zero323 Sep 02 '16 at 17:45
  • @zero323 you mean it went after the 14 stage I have been to, right? That should be the case, since in the mailing list, another guy was able to run this and said "For example, I think treeAggregate might work better. An Array[Float] may be just fine and cut down memory usage, etc.".."more specifically, since Spark 2.0 the "runs" parameter in the KMeans mllib implementation has been ignored and is always 1. This means a lot of code that wraps this stuff up in arrays could be simplified quite a lot. " But I have no idea on what modification I would made to that min. example, and no treeAggreg – gsamaras Sep 02 '16 at 17:48
  • Oh now I got it, that person meant to do the changes in the internal code of Spark, which is not something ideal, so @zero323, if you would like, I would be more than happy to see an answer from you, summarizing what you have in the linked answer of yours and the fact that it runs for you. – gsamaras Sep 02 '16 at 18:01
  • Ive found that this dfoesnt work on big data. About to explore Mahout and then maybe move away from hadoop to see what cpp packages there might be. –  Jan 06 '17 at 22:32
  • @JulianCienfuegos as my question shows, I experienced problems as well with Big Data. However, I don't think the problem is with Hadoop and Spark, but with the particular implementation of k-means. – gsamaras Jan 07 '17 at 12:53

2 Answers2

6

I think the 'hanging' is because your executors keep dying. As I mentioned in a side conversation, this code runs fine for me, locally and on a cluster, in Pyspark and Scala. However, it takes a lot longer than it should. It is almost all time spent in k-means|| initialization.

I opened https://issues.apache.org/jira/browse/SPARK-17389 to track two main improvements, one of which you can use now. Edit: really, see also https://issues.apache.org/jira/browse/SPARK-11560

First, there are some code optimizations that would speed up the init by about 13%.

However most of the issue is that it default to 5 steps of k-means|| init, when it seems that 2 is almost always just as good. You can set initialization steps to 2 to see a speedup, especially in the stage that's hanging now.

In my (smaller) test on my laptop, init time went from 5:54 to 1:41 with both changes, mostly due to setting init steps.

Sean Owen
  • 66,182
  • 23
  • 141
  • 173
2

If your RDD is so large the collectAsMap will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash. Even though you had partitioned the data, the collectAsMap sends everything to the driver and you job crashs. You can make sure the number of elements you return is capped by calling take or takeSample, or perhaps filtering or sampling your RDD. Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory:

countByKey, countByValue, collect

If you really do need every one of these values of the RDD and the data is too big to fit into memory, you could write out the RDD to files or export the RDD to a database that is large enough to hold all the data. As you are using an API, I think you are not able to do that (rewrite all the code maybe? Increase Memory?). I think this collectAsMap in the runAlgorithm method is a really bad thing in Kmeans (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html)...

Carol
  • 21
  • 2