0

I am using Spark on the cluster which I am sharing with others users. So it is not reliable to tell which one of my code runs more efficient just based on the running time. Because when I am running the more efficient code, someone else maybe running huge data works and makes my code executes for a longer time.

So can I ask 2 questions here:

  1. I was using join function to join 2 RDDsand I am trying to use groupByKey() before using join, like this:

    rdd1.groupByKey().join(rdd2)
    

    seems that it took longer time, however I remember when I was using Hadoop Hive, the group by made my query ran faster. Since Spark is using lazy evaluation, I am wondering whether groupByKey before join makes things faster

  2. I have noticed Spark has a SQL module, so far I really don't have time to try it, but can I ask what are the differences between the SQL module and RDD SQL like functions?

zero323
  • 322,348
  • 103
  • 959
  • 935
Cherry Wu
  • 3,844
  • 9
  • 43
  • 63
  • 1
    In the future please don't post two questions at the same time. – zero323 Oct 26 '15 at 11:07
  • 1
    I don't think it's too late to edit out the second question. @Cherry Wu, would you be okay with that? – Daniel Darabos Oct 26 '15 at 12:55
  • do you expect duplicate keys in both in rdd1 and rdd2? – vefthym May 05 '17 at 09:29
  • @DanielDarabos, I don't mind if you add the second question. I found Spark SQL description http://spark.apache.org/docs/latest/sql-programming-guide.html, `Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.` Here, I guess "more info" means data is organized like relational data table – Cherry Wu May 05 '17 at 20:40

2 Answers2

5
  1. There is no good reason for groupByKey followed by join to be faster than join alone. If rdd1 and rdd2 have no partitioner or partitioners differ then a limiting factor is simply shuffling required for HashPartitioning.

    By using groupByKey you not only increase a total cost by keeping mutable buffers required for grouping but what is more important you use an additional transformation which results in a more complex DAG. groupByKey + join:

    rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
    rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
    rdd1.groupByKey().join(rdd2)
    

    enter image description here

    vs. join alone:

    rdd1.join(rdd2)
    

    enter image description here

    Finally these two plans are not even equivalent and to get the same results you have to add an additional flatMap to the first one.

  2. This is a quite broad question but to highlight the main differences:

    • PairwiseRDDs are homogeneous collections of arbitraryTuple2 elements. For default operations you want key to be hashable in a meaningful way otherwise there are no strict requirements regarding the type. In contrast DataFrames exhibit much more dynamic typing but each column can only contain values from a supported set of defined types. It is possible to define UDT but it still has to be expressed using basic ones.

    • DataFrames use a Catalyst Optimizer which generates logical and physical execution planss and can generate highly optimized queries without need for applying manual low level optimizations. RDD based operations simply follow dependency DAG. It means worse performance without custom optimization but much better control over execution and some potential for fine graded tuning.

Some other things to read:

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
4

I mostly agree with zero323's answer, but I think there is reason to expect join to be faster after groupByKey. groupByKey reduces the amount of data and partitions the data by the key. Both of these help with the performance of a subsequent join.

I don't think the former (reduced data size) is significant. And to reap the benefits of the latter (partitioning) you need to have the other RDD partitioned the same way.

For example:

val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect

DAG visualization

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • 2
    It looks like in PySpark it is a difference between union followed by partitioning (`join` alone) vs partitioning followed by union (`groupByKey` + `join` with partitioned `b`). I am not sure if there is something to gain. – zero323 Oct 26 '15 at 13:25
  • 2
    Ah, you're right. But it may be the case that your other RDD is already partitioned the right way, so you don't have to count the cost of that work. At least that's what very often happens for us. – Daniel Darabos Oct 26 '15 at 13:49