1

I have 2 big parquet Dataframes and I want to join them on a userId.

What should I do to get high performance :

Should I modify the code that write those files in order to :

  • partitionBy on the userId (very sparse).
  • partitionBy on the first N char of the userId (afaik, If data are already partitioned on the same key, the join will occur with no shuffle)

On the read side, is it better to use RDD or DataFrame ?

Yann Moisan
  • 8,161
  • 8
  • 47
  • 91
  • 1
    RDD or DataFrame depends on what you want to do with your data. DataFrames are optimized RDD but there are some operations that perform better in RDD and most of them in DataFrame. About partitionBy, you got it 100% correct. You can go with HashPartitioner. – Ramesh Maharjan Jul 25 '17 at 17:53
  • But there is a slight hiccup, i.e., `partitionBy` first sorts the data and then saves it. It will make the Writes slow, although Reads/Joins will get fast. So, be careful before using `partitionBy`. – himanshuIIITian Jul 26 '17 at 04:49
  • 1
    @himanshuIIITian note that you can speed up performance of `partitionBy(colX,colY)` vastly if you call `repartion(colX, colY)` _before_ calling `write`. So `df.repartion(colX, colY).write.partitionBy(colX, colY)` is way faster than calling `df.write.partitionBy(colX, colyY)` :) – Glennie Helles Sindholt Jul 26 '17 at 10:23
  • @GlennieHellesSindholt I tried that and I found that the number of partitions getting created by `repartition` is 200 (default). Whereas the number of unique values in the dataframe was only one. This actually slowed down the write in storage. For this, I have posted a question on stackoverflow already - https://stackoverflow.com/questions/44878294/why-spark-dataframe-is-creating-wrong-number-of-partitions – himanshuIIITian Jul 26 '17 at 16:58
  • @Yann Moisan Do you have high repetition in your userIDs in each file? – Abdulrahman Jul 26 '17 at 20:50
  • @Abdulrahman no – Yann Moisan Aug 01 '17 at 14:13

1 Answers1

1

You can perform a bucketBy operation before you save to parquet file.

val NUM_BUCKETS = 20
df1.write.mode(SaveMode.Overwrite).bucketBy(NUM_BUCKETS, "userId").saveAsTable("bucketed_large_table_1")
df2.write.mode(SaveMode.Overwrite).bucketBy(NUM_BUCKETS, "userId").saveAsTable("bucketed_large_table_2")
spark.sql("select * from a join b on a.num1 = b.num2").collect()

Doing it like this way will prevent a shuffle when the join operation performed.

Keep in mind that in order to do this you need to enable hive with .enableHiveSupport, as the save to parquet operation doesn't currently support the bucketBy method.

wllmtrng
  • 182
  • 3