4

I am trying to evaluate Spark SQL for some data manipulation queries. The scenario I'm interested in this this:

table1: key, value1, value2
table2: key, value3, value4

create table table3 as
select * from table1 join table2 on table1.key = table2.key

It sounds like I should be able to create the table1 and table2 RDDs (but I don't see a very obvious example of that in the docs). But the bigger question is this -- if I have successfully partitioned the 2 table RDDs by key and then go to join them with Spark SQL, will it be smart enough to take advantage of the partitioning? And if I create a new RDD as a result of that join, will it also be partitioned? In other words, will it be completely shuffle-free? I would really appreciate pointers to documentation and or examples on these subjects.

MK.
  • 33,605
  • 18
  • 74
  • 111
  • Hm this might be dupe of http://stackoverflow.com/questions/28850596/co-partitioned-joins-in-spark-sql?rq=1 but I would appreciate links to discussions or examples too... – MK. Sep 16 '16 at 19:26
  • Do you mean http://stackoverflow.com/q/30995699/1560062? – zero323 Sep 16 '16 at 19:51

1 Answers1

4

If you mean conversions between RDDs and Datasets then the answer to both question is negative.

RDD partitioning is defined only for RDD[(T, U)] and will be lost after RDD is converted to a Dataset. There are some cases when you can benefit per-existing data layout but join is not one of these especially that RDDs and Datasets use different hashing techniques (standard hashCode and MurmurHash respectively. You can of course mimic the latter one by defining custom partitioner RDD but it is not really the point).

Similarly information about partitioning is lost when Dataset is converted to RDD.

You can use Dataset partitioning to which can be used to optimize joins though. For example if tables have been pre-partitioned:

val n: Int = ??? 

val df1 =  Seq(
  ("key1", "val1", "val2"), ("key2", "val3", "val4")
).toDF("key", "val1", "val2").repartition(n, $"key").cache

val df2 = Seq(
  ("key1", "val5", "val6"), ("key2", "val7", "val8")
).toDF("key", "val3", "val4").repartition(n, $"key").cache

subsequent join based on the key won't require additional exchange.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1

df1.explain
// == Physical Plan ==
// InMemoryTableScan [key#171, val1#172, val2#173]
//    +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
//          +- Exchange hashpartitioning(key#171, 3)
//             +- LocalTableScan [key#171, val1#172, val2#173]
df2.explain
// == Physical Plan ==
// InMemoryTableScan [key#201, val3#202, val4#203]
//    +- InMemoryRelation [key#201, val3#202, val4#203], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
//          +- Exchange hashpartitioning(key#201, 3)
//             +- LocalTableScan [key#201, val3#202, val4#203]
// 
df1.join(df3, Seq("key")).explain
// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#232, val6#233]
// +- *SortMergeJoin [key#171], [key#231], Inner
//    :- *Sort [key#171 ASC], false, 0
//    :  +- *Filter isnotnull(key#171)
//    :     +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
//    :           +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
//    :                 +- Exchange hashpartitioning(key#171, 3)
//    :                    +- LocalTableScan [key#171, val1#172, val2#173]
//    +- *Sort [key#231 ASC], false, 0
//       +- *Filter isnotnull(key#231)
//          +- InMemoryTableScan [key#231, val5#232, val6#233], [isnotnull(key#231)]
//                +- InMemoryRelation [key#231, val5#232, val6#233], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
//                      +- Exchange hashpartitioning(key#231, 3)
//                         +- LocalTableScan [key#231, val5#232, val6#233]

Obviously we don't really benefit from that on a single join. So it makes sense only if a single table is used for multiple joins.

Also Spark can benefit from the partitioning created by join so if we wanted to perform another join:

val df3 = Seq(
  ("key1", "val9", "val10"), ("key2", "val11", "val12")
).toDF("key", "val5", "val6")

df1.join(df3, Seq("key")).join(df3, Seq("key"))

we would benefit from the structure created by the first operation (note ReusedExchange):

// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#682, val6#683, val5#712, val6#713]
// +- *SortMergeJoin [key#171], [key#711], Inner
//    :- *Project [key#171, val1#172, val2#173, val5#682, val6#683]
//    :  +- *SortMergeJoin [key#171], [key#681], Inner
//    :     :- *Sort [key#171 ASC], false, 0
//    :     :  +- Exchange hashpartitioning(key#171, 200)
//    :     :     +- *Filter isnotnull(key#171)
//    :     :        +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
//    :     :              +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
//    :     :                    +- Exchange hashpartitioning(key#171, 3)
//    :     :                       +- LocalTableScan [key#171, val1#172, val2#173]
//    :     +- *Sort [key#681 ASC], false, 0
//    :        +- Exchange hashpartitioning(key#681, 200)
//    :           +- *Project [_1#677 AS key#681, _2#678 AS val5#682, _3#679 AS val6#683]
//    :              +- *Filter isnotnull(_1#677)
//    :                 +- LocalTableScan [_1#677, _2#678, _3#679]
//    +- *Sort [key#711 ASC], false, 0
//       +- ReusedExchange [key#711, val5#712, val6#713], Exchange hashpartitioning(key#681, 200)
zero323
  • 322,348
  • 103
  • 959
  • 935
  • So this is example with DataSet, that's good. Will this map to SparkSql? What if I create a new DF using Spark SQL by joining 2 DFs partitioned on the same column? Will the resulting DF be partitioned? – MK. Sep 16 '16 at 20:29
  • 1
    Yes, there is no execution difference between SQL and `DataFrame` API. – zero323 Sep 16 '16 at 20:33
  • and my understanding is that it will only be smart in Spark 2.0, right? – MK. Sep 16 '16 at 20:51
  • 1
    As far as I remember basic optimizations should work in 1.6 as well but for full benefits you'll need 2.0+ – zero323 Sep 16 '16 at 21:15