3

Given a large file containing data of the form, (V1,V2,...,VN)

2,5
2,8,9
2,5,8
...

I am trying to achieve a list of pairs similar to the following using Spark

((2,5),2)
((2,8),2)
((2,9),1)
((8,9),1)
((5,8),1)

I tried the suggestions mentioned in response to an older question, but I have encountered some issues. For example,

val dataRead = sc.textFile(inputFile)
val itemCounts = dataRead
  .flatMap(line => line.split(","))
  .map(item => (item, 1))
  .reduceByKey((a, b) => a + b)
  .cache()
val nums = itemCounts.keys
  .filter({case (a) => a.length > 0})
  .map(x => x.trim.toInt)
val pairs = nums.flatMap(x => nums2.map(y => (x,y)))

I got the error,

scala> val pairs = nums.flatMap(x => nums.map(y => (x,y)))
<console>:27: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(Int, Int)]
 required: TraversableOnce[?]
       val pairs = nums.flatMap(x => nums.map(y => (x,y)))
                                             ^

Could someone please point me towards what I might be doing incorrectly, or what might be a better way to achieve the same? Many thanks in advance.

Community
  • 1
  • 1
user799188
  • 13,965
  • 5
  • 35
  • 37

4 Answers4

4

You may use combinations method of array to achieve this objective.

val dataRead = sc.textFile(inputFile)
// "2,5"
// "2,8,9"
// "2,5,8" 
//  ...

val combinations = dataRead.flatMap { line =>
        line.split(",")        // "2,8,9" => Array(2,8,9)
            .combinations(2)   // Iterator
            .toSeq             // ~ Array(Array(2,8), Array(2,9), Array(8,9))
            .map{ case arr => arr(0) -> arr(1) }  // Array((2,8), (2,9), (8,9))
}

// Array((2,5), (2,8), (2,9), (8,9), (2,5), (2,8), (5, 8), ...)

val result = combinations.map(item => item -> 1) // Array(((2,5),1), ((2,9),1), ...)
                         .reduceByKey(_ + _)   
// Array(((2,5),2), ((2,8),2), ((2,9),1), ((8,9),1), ((5,8),1) ....) 
// order may be different.
Kostas
  • 8,356
  • 11
  • 47
  • 63
Shyamendra Solanki
  • 8,751
  • 2
  • 31
  • 25
1

I'm not sure that got what you need, I extract pairs of numbers from each line with sliding window, for example from line 2,8,9 I extract 2 pairs: (2, 8) & (8, 9). If you need some other pairs extraction you need to update sliding(2) to something else

  val dataRead = sc.textFile(this.getClass.getResource("/text.txt").getFile)

  // Extract tuples from each line
  val tuples: RDD[(Int, Int)] = dataRead.flatMap(_.split(",").sliding(2)).map {
    case Array(l, r) => (l.toInt, r.toInt)
  }  

  val count = tuples.countByValue()

  count.foreach(println)

Output

((2,5),2)
((8,9),1)
((5,8),1)
((2,8),1)
Eugene Zhulenev
  • 9,714
  • 2
  • 30
  • 40
1

Basically, you are trying to do a WordCount on (Int, Int) as keys instead of String which is the common example.

So the aim here is to convert your lines as (Int, Int) tuples:

val pairs = sc.textFile(inputFile)
              .map(line => line.split(","))
              .flatMap(a => a.sliding(2))
              .map(a => (a(0).toInt, a(1).toInt) -> 1)
              .reduceByKey(_ + _)
Jean Logeart
  • 52,687
  • 11
  • 83
  • 118
0

You can extract word pairs with the sliding of 2 intervals. Some sentences can have only one word, so you have to have the proper matching case in the map function.

val mapRdd = sc.textFile("inputFile.csv")
  .map { line => line.split(" ") }
  .flatMap { wordList => wordList.sliding(2) }
  .map {
    case Array(word1, word2) => ((word1, word2), 1)
    case Array(word1) => ((word1, ""), 1)
  }
  .reduceByKey(_ + _)

println("===================================================")
mapRdd.foreach { li =>
  println(li._1.toString() + " ---> " + li._2)
  //println(li.sliding(2).foreach(li => println(li.toList)))
}
Vivek Narayanasetty
  • 522
  • 2
  • 5
  • 12