2

I now have a lot of key value pairs (key, value)

Now for one key, I don't want to get the value's average or some other aggregations, I just need one value. (Get the distinct keys)

Let me have an example here,

("1","apple")
("1","apple")
("2","orange")
("2","orange")
("1","apple")
("1","pear")

The result can be

("2","orange")
("1","apple")

or

("2","orange")
("1","pear")

I can use reduceByKey(((a,b) => a)) to get this, but as there are a lot of keys, the time is very long.

Any one have some better suggestions ?

Thanks!

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Elona Mishmika
  • 480
  • 2
  • 5
  • 21

3 Answers3

1

Yiling, you may use the transformation distinct to keep distinct elements in your RDD. https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.rdd.RDD

Wuyang Li
  • 31
  • 2
  • Thanks! But it seems your distinct will only distinct the same element. For me, if I happened to have ("1","apple") and ("1","orange"), I only want one sample exist in the result. – Elona Mishmika Dec 28 '16 at 10:50
  • ah, in this case distinct is not the right transformation to use. According to the dicussion [how-does-distinct-function-work-in-spark](http://stackoverflow.com/questions/30959955/how-does-distinct-function-work-in-spark), in a tuple-RDD the tuple as a whole is considered. – Wuyang Li Dec 28 '16 at 10:57
  • Hi, Yiling, if I'm not mistaken, you're actually asking for "distinct by key". Can you modify your question description? as I quote, " I assume the same key will have the same value", which is really misleading. – Wuyang Li Dec 28 '16 at 11:13
  • THanks U, I just modified my question – Elona Mishmika Dec 28 '16 at 12:58
1

Actually it is a typical map-reduce like problem. But you just want only one value to each key, so simply you can do it in reduce phase, although it is not the best way. And now you know that using reduceByKey only will cost a lot of time in useless shuffle, which means you should pre-reduce your data in Mapper. So the answer is obvious for you: using combiner.

In spark you can use combineByKey before your reduceByKey to remove duplicate values.

==========

Besides combiner, you can also change the shuffle method. The default shuffle for Spark 1.2+ is SortShuffle. You can change it to HashShuffle which can reduce the cost of sorting keys.

try to set this in your sparkConf

spark.shuffle.manager = hash
spark.shuffle.consolidateFiles = true

But you have to pay attention that too much map cores may produce too much shuffle files which will affect the performance.spark.shuffle.consolidateFiles is used to merge mapper output files.

Lhfcws
  • 302
  • 1
  • 3
  • 15
  • Thanks, but as I have a lot of keys, even combiner used, it would still take a lof of time in useless shuffle. I'm thinking about a hash map to use...but no idea.. – Elona Mishmika Dec 28 '16 at 11:05
  • I've updated my answer, you can try hash shuffle instead of using default sort shuffle. But you have to pay attention that too much map cores may produce too much shuffle files which will affect the performance. – Lhfcws Dec 29 '16 at 12:05
1

you can use dropDuplicates() of DataFrame.

val df = sc.parallelize(
  List(
      ("1", "apple"),
      ("1", "apple"),
      ("2", "orange"),
      ("2", "orange"),
      ("1", "apple"),
      ("1", "pear")
  )
).toDF("count", "name")

df.show()
+-----+------+
|count|  name|
+-----+------+
|    1| apple|
|    1| apple|
|    2|orange|
|    2|orange|
|    1| apple|
|    1|  pear|
+-----+------+

drop duplicates by name

val uniqueDf = df.dropDuplicates("name")

Now pick top 2 unique rows

uniqueDf.limit(2).show()

+-----+------+
|count|  name|
+-----+------+
|    2|orange|
|    1| apple|
+-----+------+

Unique records without limit

uniqueDf.show()
+-----+------+
|count|  name|
+-----+------+
|    2|orange|
|    1| apple|
|    1|  pear|
+-----+------+

Edit:

You can use collect() on DataFrame to get values into List.

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125