I am re-writing a Spark application to use more DataFrame operations for efficiency and robustness. However, there is one part of the application that cannot be done with DataFrames and I have to drop to RDD. Stripped to its essentials, the code would look like this:
C = A.join(B, join_key) # join_key is a string naming a column
D = C.rdd.mapPartitions(do_something)
For correct operation, do_something
requires that C.rdd
be partitioned by join_key
. I think this will be the case, because equijoins work by partitioning the data by the key, then forming pairs whose key values are the same. In Spark RDD joins, the pairs are formed implicitly by an iterator over the partition data, and there would be no way for the pairs to leave the partition in which they were defined unless I told Spark to "materialize" the iterator into a list of pairs and then repartition the result, which I am not doing here. I expect that the same is true of DataFrame joins.
All that said, the discussion above does not prove the desired partitioning is ensured. I'm relying on details of the Spark implementation that are not guaranteed through the API, and I'm not sure that is 100% safe. There's no guarantee the Catalyst optimizer won't toss an extra partition boundary into a group of pairs sharing the same key, breaking it up and making my algorithm incorrect.
To ensure the desired partitioning, I could explicitly do C.rdd.partitionBy(lambda x: x['join_key'])
before applying my do_something
function, but I'm worried that this could trigger a lot of unnecessary serialization, shuffling, or other overheads.
It appears that I could also use DISTRIBUTE BY
from HiveQL, according to this blog post, but again, I do not know what overheads this might trigger.
My question is: is it safe to rely on the implicit partitioning induced by the join, or should I ensure it explicitly? If so, what is the most efficient way to ensure it? I'm working with PySpark 1.6.2.