2

We have 2 Hive tables which are read in spark and joined using a join key, let’s call it user_id. Then, we write this joined dataset to S3 and register it hive as a 3rd table for subsequent tasks to use this joined dataset. One of the other columns in the joined dataset is called keychain_id.

We want to group all the user records belonging to the same keychain_id in the same partition for a reason to avoid shuffles later. So, can I do a repartition(“keychain_id”) before writing to s3 and registering it in Hive , and when I read the same data back from this third table will it still have the same partition grouping (all users belonging to the Same keychain_id in the same partition)? Because trying to avoid doing a repartition(“keychain_id”) every time when reading from this 3rd table. Can you please clarify ? If there is no guarantee that it will retain the same partition grouping while reading, then is there another efficient way this can be done other than caching?

user2221654
  • 311
  • 1
  • 7
  • 20

1 Answers1

1

if there is no data skew(will lead to diff partition file sizes) in keychain_id you can do write with partitionBy:

 df.write\
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

Update:

In order to 'retain the grouping of user records having the same keychain_id in the same dataframe partition'

You could repartition before, on unique ids and/or column

from pyspark.sql import functions as F
n = df.select(F.col('keychain_id')).distinct().count()

df.repartition(n, F.col("keychain_id)\
 .write \
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

 or 

df.repartition(n)\
 .write \
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")
murtihash
  • 8,030
  • 1
  • 14
  • 26
  • this could be useful if we have to filter by keychain_id while consuming the data, but will it retain the grouping of user records having the same keychain_id in the same dataframe partition when consuming all the data from this table? – user2221654 Feb 27 '20 at 00:42
  • @user2221654 check my updated answer, or for more info refer to https://stackoverflow.com/questions/60048027/how-to-manage-physical-data-placement-of-a-dataframe-across-the-cluster-with-pys/60048672#60048672 – murtihash Feb 27 '20 at 01:15
  • just one question related to the above, when reading back the data stored in the hive table wont the number of partitions be determined by the split size based on the file system? if yes, then the records belonging to the same keychain_id though they are written to the same file they could still end up being in different partitions when being read again is that right, or am I missing something here? – user2221654 Feb 27 '20 at 04:48
  • i think if you are inserting into hive external table, and then reading from there , the partitions should be preserved unless the partitions have data skew 128mb> or depending on your default block size in hdfs – murtihash Feb 27 '20 at 04:58
  • okay thanks for the clarification. I will accept the answer, but also will keep in mind that data skew could affect it. – user2221654 Feb 27 '20 at 17:34