0

I have this type ;

 column1 column2 int 
 ((a,b),1)
 ((a,c),1)
 ((k,a),1)

I need two types results,firstly total of column1 for all column2

(a,total)

and secondly column1 parameter is not equal to column2 parameter

(a,total)

How can i use ReduceByKey For this type ?

my codes :

var data = sc.textFile("tttt.tsv")
var satir = data.map(line=> ((line.split("\t")(1).toString,line.split("\t")(2).toString),1))

---About Second Scenario

My data type example is

column1 column2 int
a b,1 
a c,1
a a,1
a d,1

I need reduceByKey for column1 is not equal column1 in second scenario

for example,my example data result = (a,b,1) + (a,c,1)+(a,d,1) = 3

Community
  • 1
  • 1
ardaore
  • 13
  • 2

2 Answers2

0

For First Scenario you can use this one.

val arrangedDF = satir.map(pairData => (pairData._1._1, (pairData._1._2, 1)))
val result1DF = arrangedDF.reduceByKey((x,y) => x._2 + y._2)

Here i rearranged the representation of data i separated key and put rest the data into different tuple by this you can apply reduceByKey directly.

For Second Scenario you can use this.

val result2DF = arrangedDF.filter( pairData => pairData._1 != pairData._2._1).reduceByKey((x,y) => x._2 + y._2)

Here for second Scenario i reused arrangedDF and apply filter over it for the condition you wanted that is column1 parameter not equal to column2 parameter and then applied reduceByKey

Hope my answer was clear

Thanks

Akash Sethi
  • 2,284
  • 1
  • 20
  • 40
  • Hi,Thank you so much your answer ; When i tried your answer,scala : scala> val result1DF = arrangedDF.reduceByKey((x,y) => x._2 + y._2) :30: error: type mismatch; found : Int required: (String, Int) val result1DF = arrangedDF.reduceByKey((x,y) => x._2 + y._2) – ardaore May 15 '17 at 08:41
0

If I understand your question correctly, below is one way to get what you're asking:

val rdd = sc.parallelize(Seq(
  (("a", "b"), 1),
  (("a", "c"), 1),
  (("a", "d"), 1),
  (("a", "a"), 1),
  (("k", "k"), 1),
  (("k", "a"), 1),
  (("k", "b"), 1)
))

val rdd1 = rdd.map{ case ((x, y), c) => (x, c) }.
  reduceByKey(_ + _)

scala> rdd1.collect.foreach(println)
(a,4)
(k,3)

val rdd2 = rdd.filter{ case ((x, y), c) => x != y }.
  map{ case ((x, y), c) => (x, c) }.
  reduceByKey(_ + _)

scala> rdd2.collect.foreach(println)
(a,3)
(k,2)
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Hi Leo C,Thank you so much your answer, I have two correct rdd. Do you have an idea how can i join this tow RDD ? (For Example (a,4) and secondly (a,3), how can i join (a,4,3) ? ) – ardaore May 15 '17 at 08:43
  • Glad that it helps. You can convert the RDDs to DataFrames via `toDF("col1", "col2")` then `df1.join(df2, "col1")`. Or you can join the RDDs like [this](http://stackoverflow.com/questions/27437507/join-two-ordinary-rdds-with-without-spark-sql). – Leo C May 15 '17 at 15:36