2

I have a dataframe as follows:

+------+-------+
| key  | label |
+------+-------+
| key1 | a     |
| key1 | b     |
| key2 | a     |
| key2 | a     |
| key2 | a     |
+------+-------+

I want a modified version of countByKeys in spark which returns output as follows:

+------+-------+
| key  | count |
+------+-------+
| key1 |     0 |
| key2 |     3 |
+------+-------+
//explanation: 
if all labels under a key are same, then return count of all rows under a key 
else count for that key is 0

My approach to solve this problem:

Steps:

  1. reduceByKey() : concatenate all labels (consider labels as strings) to get dataframe of type < key,concat_of_all_labels >
  2. mapValues() : parse each string character-wise to check if there are all same. If they are same return number of labels , else return 0.

I am new to spark and I feel that there should be some efficient way to get this done. Is there any better way to get this task done?

Hi Hello
  • 33
  • 2

3 Answers3

6

It's quite straightforward: get both count and distinct count by key, then it's just a simple case when ... then ...

val df = Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")).toDF("key", "label")
df.groupBy('key)
  .agg(countDistinct('label).as("cntDistinct"), count('label).as("cnt"))
  .select('key, when('cntDistinct === 1, 'cnt).otherwise(typedLit(0)).as("count"))
  .show

+----+-----+
| key|count|
+----+-----+
|key1|    0|
|key2|    3|
+----+-----+
Kombajn zbożowy
  • 8,755
  • 3
  • 28
  • 60
  • Thanks for answering ! In spark's guide [here] ( https://spark.apache.org/docs/latest/rdd-programming-guide.html ) groupByKey is said to be slower than reduceByKey. Wouldn't it be causing some performance issues? – Hi Hello Apr 13 '19 at 15:32
  • Well, that's for RDD. [Dataset](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) doesn't even have reduceByKey. I'm relatively new too, so I can't say for sure, but based on [this answer](https://stackoverflow.com/a/32903568/2890093) I can guess that it should be okay. – Kombajn zbożowy Apr 13 '19 at 16:50
  • There is no way to do aggregations using `reduceByKey`. – Sai Apr 13 '19 at 17:43
  • That seems to be true. Couldn't find any reduceByKey in dataframe api. Thanks for the answer. – Hi Hello Apr 13 '19 at 17:52
2

Adding to the previous solution. Using reduceByKey is more efficient if your data is really big and you care about parallelism.

If your data is big and want to reduce shuffling effect, as groupBy can cause shuffling, here is another solution using RDD API and reduceByKey that will operate within a partition level:

val mockedRdd = sc.parallelize(Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")))

// Converting to PairRDD
val pairRDD = new PairRDDFunctions[String, String](mockedRdd)

// Map and then Reduce
val reducedRDD = pairRDD.mapValues(v => (Set(v), 1)).
     reduceByKey((v1, v2) => (v1._1 ++ v2._1, v1._2 + v1._2))

scala> val result = reducedRDD.collect()
`res0: Array[(String, (scala.collection.immutable.Set[String], Int))] = Array((key1,(Set(a, b),2)), (key2,(Set(a),4)))`

Now the final result has the following format (key, set(labels), count):

Array((key1,(Set(a, b),2)), (key2,(Set(a),4)))

Now after you collect the results in your driver, you can simply accept counts from Sets that contain only one label:

// Filter our sets with more than one label
scala> result.filter(elm => elm._2._1.size == 1)
res15: Array[(String, (scala.collection.immutable.Set[String], Int))] 
              = Array((key2,(Set(a),4)))

Analysis using Spark 2.3.2

1) Analysing the (DataFrame API) groupBy Solution

I am not really a Spark Expert, but I will throw my 5 cents here :)

Yes, DataFrame and SQL Query go through Catalyst Optimizer, which can possibly optimize a groupBy.

groupBy approach proposed using DataFrame API generates the following Physical Plan by running df.explain(true)

== Physical Plan ==
*(3) HashAggregate(keys=[key#14], functions=[count(val#15), count(distinct val#15)], output=[key#14, count#94L])
+- Exchange hashpartitioning(key#14, 200)
   +- *(2) HashAggregate(keys=[key#14], functions=[merge_count(val#15), partial_count(distinct val#15)], output=[key#14, count#105L, count#108L])
      +- *(2) HashAggregate(keys=[key#14, val#15], functions=[merge_count(val#15)], output=[key#14, val#15, count#105L])
         +- Exchange hashpartitioning(key#14, val#15, 200)
            +- *(1) HashAggregate(keys=[key#14, val#15], functions=[partial_count(val#15)], output=[key#14, val#15, count#105L])
               +- *(1) Project [_1#11 AS key#14, _2#12 AS val#15]
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#12]
                     +- Scan ExternalRDDScan[obj#10]

Note that the job has been split into three stages, and has two exchange phases. And it is worth to mention that the second hashpartitioning exchange used a different set of keys (key, label), which IMO will cause shuffle in this case as partitions hashed with (key, val) will not necessary co-exist with partitions hashed with (key) only.

Here is the plan visualized by Spark UI:

groupBy solution

2) Analysing the RDD API Solution

By running reducedRDD.toDebugString, we will get the following plan:

scala> reducedRDD.toDebugString
res81: String =
(8) ShuffledRDD[44] at reduceByKey at <console>:30 []
 +-(8) MapPartitionsRDD[43] at mapValues at <console>:29 []
    |  ParallelCollectionRDD[42] at parallelize at <console>:30 []

Here is the plan visualized by Spark UI:

RDD API Approach

You can clearly see that the RDD approach generated less number of stages and tasks, and also doesn't cause any shuffle until we process the dataset and collect it from the driver side of course. This alone tells us that this approach consumes less resources and time.

Conclusion at the end of the day, how much optimization you want to apply does really depend on your business requirement, and the size of the Data you are dealing with. If you don't have big data, then going by the groupBy approach will be a straight forward option; otherwise, considering (Parallelism, Speed, Shuffling, & Memory Management) will be important, and most of the time you can accomplish that by analyzing Query Plans and examining your jobs through Spark UI.

aelbuni
  • 311
  • 1
  • 5
  • Thanks for the answer. But there is no `reduceByKey` in dataframe api, but only `groupByKey` . Doesn't spark automatically optmize `groupByKey` queries in such case, to minimize shuffle? – Hi Hello Apr 13 '19 at 18:10
  • That is a really good question that kept me thinking how much smart the Catalyst Optimizer would be? And eventually made me run a performance evaluation experiment, that I attached its result in my response. Please check my edited response. Regards – aelbuni Apr 13 '19 at 21:13
  • Think of RDD as an abstraction layer, which make it as an assembler or machine code when it comes to system programming. (Evolution Cycle: RDD => Dataframe => Datasets). RDD expresses how to do something and not how it is going to be achieved, thus there is no room for optimizers in RDD level – aelbuni Apr 14 '19 at 01:04
0
scala> val df = sc.parallelize(Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")))

scala> val grpby = df.groupByKey()

scala> val mp = gb.map( line  => (line._1,line._2.toList.length,line._2.toSet.size))
                  .map { case(a,b,c) => (a,if (c!=1) 0 else b) }

scala> val finres  =  mp.toDF("key","label")

scala> finres.show
+----+-----+
| key|label|
+----+-----+
|key1|    0|
|key2|    3|
+----+-----+
thebluephantom
  • 16,458
  • 8
  • 40
  • 83