10

I am trying to implement KMeans using Apache Spark.

val data = sc.textFile(irisDatasetString)
val parsedData = data.map(_.split(',').map(_.toDouble)).cache()

val clusters = KMeans.train(parsedData,3,numIterations = 20)

on which I get the following error :

error: overloaded method value train with alternatives:
  (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int,runs: Int)org.apache.spark.mllib.clustering.KMeansModel <and>
  (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int)org.apache.spark.mllib.clustering.KMeansModel <and>
  (data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int,runs: Int,initializationMode: String)org.apache.spark.mllib.clustering.KMeansModel
 cannot be applied to (org.apache.spark.rdd.RDD[Array[Double]], Int, numIterations: Int)
       val clusters = KMeans.train(parsedData,3,numIterations = 20)

so I tried converting Array[Double] to Vector as shown here

scala> val vectorData: Vector = Vectors.dense(parsedData)

on which I got the following error :

error: type Vector takes type parameters
   val vectorData: Vector = Vectors.dense(parsedData)
                   ^
error: overloaded method value dense with alternatives:
  (values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
  (firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
 cannot be applied to (org.apache.spark.rdd.RDD[Array[Double]])
       val vectorData: Vector = Vectors.dense(parsedData)

So I am inferring that org.apache.spark.rdd.RDD[Array[Double]] is not the same as Array[Double]

How can I proceed with my data as org.apache.spark.rdd.RDD[Array[Double]] ? or how can I convert org.apache.spark.rdd.RDD[Array[Double]] to Array[Double] ?

zero323
  • 322,348
  • 103
  • 959
  • 935
sand
  • 137
  • 1
  • 2
  • 9

1 Answers1

6

KMeans.train is expecting RDD[Vector] instead of RDD[Array[Double]]. It seems to me that all you need to do is change

val parsedData = data.map(_.split(',').map(_.toDouble)).cache()

to

val parsedData = data.map(x => Vectors.dense(x.split(',').map(_.toDouble))).cache()
Mike Park
  • 10,845
  • 2
  • 34
  • 50
  • No, that is not working. I got the following error now: error: missing parameter type for expanded function ((x$1) => x$1.split(',').map(((x$2) => x$2.toDouble))) val parsedData = data.map(Vectors.dense(_.split(',').map(_.toDouble))).cache() – sand Jan 09 '15 at 05:08
  • I tried that too. So I get parsedData of type: `org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] `which I then try to convert to a Vector using: `val dataArray = parsedData.collect val dataVector = Vectors.dense(dataArray)` which does not work out either because my dataArray is `Array[org.apache.spark.mllib.linalg.Vector]` and `Vector.dense` expects an `Array[Double]` – sand Jan 12 '15 at 11:19
  • Why do you want the `RDD[Vector]` to be a single vector? `KMeans.train` is expecting an `RDD[Vector]`. – Mike Park Jan 12 '15 at 16:15
  • You are right :) For some reason I thought I had to collect the data and then pass it to k means. Your solution works :) Thanks. – sand Jan 14 '15 at 07:14
  • Hey Climbage, how do you write the same thing in pyspark? I am trying to get multi variate statistics on my data present in CSV file. The function is expecting RDD[Vectors]. I don't know how to get them – SRS Jul 02 '15 at 16:06