If you mean conversions between RDDs
and Datasets
then the answer to both question is negative.
RDD partitioning is defined only for RDD[(T, U)]
and will be lost after RDD
is converted to a Dataset
. There are some cases when you can benefit per-existing data layout but join
is not one of these especially that RDDs
and Datasets
use different hashing techniques (standard hashCode
and MurmurHash
respectively. You can of course mimic the latter one by defining custom partitioner RDD
but it is not really the point).
Similarly information about partitioning is lost when Dataset
is converted to RDD
.
You can use Dataset
partitioning to which can be used to optimize joins
though. For example if tables have been pre-partitioned:
val n: Int = ???
val df1 = Seq(
("key1", "val1", "val2"), ("key2", "val3", "val4")
).toDF("key", "val1", "val2").repartition(n, $"key").cache
val df2 = Seq(
("key1", "val5", "val6"), ("key2", "val7", "val8")
).toDF("key", "val3", "val4").repartition(n, $"key").cache
subsequent join
based on the key
won't require additional exchange.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1
df1.explain
// == Physical Plan ==
// InMemoryTableScan [key#171, val1#172, val2#173]
// +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#171, 3)
// +- LocalTableScan [key#171, val1#172, val2#173]
df2.explain
// == Physical Plan ==
// InMemoryTableScan [key#201, val3#202, val4#203]
// +- InMemoryRelation [key#201, val3#202, val4#203], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#201, 3)
// +- LocalTableScan [key#201, val3#202, val4#203]
//
df1.join(df3, Seq("key")).explain
// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#232, val6#233]
// +- *SortMergeJoin [key#171], [key#231], Inner
// :- *Sort [key#171 ASC], false, 0
// : +- *Filter isnotnull(key#171)
// : +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
// : +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// : +- Exchange hashpartitioning(key#171, 3)
// : +- LocalTableScan [key#171, val1#172, val2#173]
// +- *Sort [key#231 ASC], false, 0
// +- *Filter isnotnull(key#231)
// +- InMemoryTableScan [key#231, val5#232, val6#233], [isnotnull(key#231)]
// +- InMemoryRelation [key#231, val5#232, val6#233], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// +- Exchange hashpartitioning(key#231, 3)
// +- LocalTableScan [key#231, val5#232, val6#233]
Obviously we don't really benefit from that on a single join. So it makes sense only if a single table is used for multiple joins
.
Also Spark can benefit from the partitioning created by join
so if we wanted to perform another join
:
val df3 = Seq(
("key1", "val9", "val10"), ("key2", "val11", "val12")
).toDF("key", "val5", "val6")
df1.join(df3, Seq("key")).join(df3, Seq("key"))
we would benefit from the structure created by the first operation (note ReusedExchange
):
// == Physical Plan ==
// *Project [key#171, val1#172, val2#173, val5#682, val6#683, val5#712, val6#713]
// +- *SortMergeJoin [key#171], [key#711], Inner
// :- *Project [key#171, val1#172, val2#173, val5#682, val6#683]
// : +- *SortMergeJoin [key#171], [key#681], Inner
// : :- *Sort [key#171 ASC], false, 0
// : : +- Exchange hashpartitioning(key#171, 200)
// : : +- *Filter isnotnull(key#171)
// : : +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)]
// : : +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
// : : +- Exchange hashpartitioning(key#171, 3)
// : : +- LocalTableScan [key#171, val1#172, val2#173]
// : +- *Sort [key#681 ASC], false, 0
// : +- Exchange hashpartitioning(key#681, 200)
// : +- *Project [_1#677 AS key#681, _2#678 AS val5#682, _3#679 AS val6#683]
// : +- *Filter isnotnull(_1#677)
// : +- LocalTableScan [_1#677, _2#678, _3#679]
// +- *Sort [key#711 ASC], false, 0
// +- ReusedExchange [key#711, val5#712, val6#713], Exchange hashpartitioning(key#681, 200)