0

I have a dataset that contains DocID, WordID and frequency (count) as shown below. Note that the first three numbers represent 1. the number of documents, 2. the number of words in the vocabulary and 3. the total number of words in the collection.

189
1430
12300
1 2 1
1 39 1
1 42 3
1 77 1
1 95 1
1 96 1
2 105 1
2 108 1
3 133 3

What I want to do is to read the data (ignore the first three lines), combine the words per document and finally represent each document as a vector that contains the frequency of the wordID.

Based on the above dataset the representation of documents 1, 2 and 3 will be (note that vocab_size can be extracted by the second line of the data):

val data = Array(
    Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0), (95, 1.0), (96, 1.0))),
    Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))),
    Vectors.sparse(vocab_size, Seq((133, 3.0))))

The problem is that I am not quite sure how to read the .txt.gz file as RDD and create an Array of sparse vectors as described above. Please note that I actually want to pass the data array in the PCA transformer.

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
  • 1
    Just read with `textFiles` and parse in `map`. See: http://stackoverflow.com/questions/42761912/how-to-read-gz-compressed-file-by-pyspark – T. Gawęda Apr 03 '17 at 12:17
  • The problem essentially is how to combine the words per document and put that representation in a sparse vector. – Giorgos Myrianthous Apr 03 '17 at 12:20

1 Answers1

0

Something like this should do the trick:

sc.textFile("path/to/file").flatMap(r => r.split(' ') match {
  case Array(doc, word, freq) => Some((doc.toInt, (word.toInt, freq.toDouble)))
  case _ => None
}).groupByKey().mapValues(a => Vectors.sparse(vocab_size, a.toSeq))

Note that the groupByKey method will load all the keys for each document into memory, so you might want to use one of its variants reduceByKey or aggregateByKey instead (I would have, but I don't know the methods you have on your sparse vectors, although you probably have something to merge them together).

Cyrille Corpet
  • 5,265
  • 1
  • 14
  • 31
  • You should change `sc.textFiles` to `sc.textFile` and `freq.toDouble` to `count.toDouble` (or vice-versa). Once I correct those typos, I get the following error: `Main.scala:46: overloaded method value sparse with alternatives:` – Giorgos Myrianthous Apr 03 '17 at 12:50
  • Where did you get your `Vectors` object from? It is neither in `scala.collection`, nor in apache-spark. It would help to understand how it can be used. – Cyrille Corpet Apr 03 '17 at 12:53
  • Ignore my previous observation. It works OK however, the problem is that it is not in the form `Array[Vector]`. What I actually want to do is (given that the output of your code is stored in `data`): `val dataRDD = sc.parallelize(data)` then, `val mat: RowMatrix = new RowMatrix(dataRDD)` and finally perform PCA: `val pc: Matrix = mat.computePrincipalComponents(4)` – Giorgos Myrianthous Apr 03 '17 at 13:02
  • Why do you need it in an `Array`? The result of my code is already in an `RDD`, as `sc.parallelize` would do if you did it locally before-hand. consider that my result is already `dataRDD` (or maybe take only its `values` if you don't care about `docId`). – Cyrille Corpet Apr 03 '17 at 13:13
  • Right, but trying to construct a `RowMatrix` from the `dataRDD`, will give: `type mismatch; found : org.apache.spark.rdd.RDD[(Int, org.apache.spark.mllib.linalg.Vector)] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]` – Giorgos Myrianthous Apr 03 '17 at 13:14
  • So do `dataRDD.values` – Cyrille Corpet Apr 03 '17 at 13:16
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/139775/discussion-between-old-school-and-cyrille-corpet). – Giorgos Myrianthous Apr 03 '17 at 13:19