-1

I am constantly using "insert overwrite table table_name partition(partition_column) query"" to write data into my table but the problem here is the number of files generated.

so i started using spark.sql.shuffle.partitions property to fix the number of files.

Now the problem statement here is that there is less data in some partition and very huge data in some partitions. when this happens, when i choose my shuffle partitions as per my large partition data there are unnecessary small files created and if i choose shuffle partitions as per partitions with low data, job starts failing with memory issues.

Is there a good way to solve this?

2 Answers2

0

You need to consider .repartition() in this case, As repartition results almost same size partitions which further increases processing times!

  • Need to comeup with number of partitions to the dataframe, based on dataframe count..etc and apply repartition.

  • Refer to this link to dynamically create repartition based on number of rows in dataframe.

notNull
  • 30,258
  • 4
  • 35
  • 50
0

The function you are looking for is Size Estimator, it will return the number of bytes your file is. Spark is horrendous when it comes to files and number of files. To control the number of files being output you are going to want to run the repartition command because the number of output files form Spark is directly associated with number of partitions the object has. For my example below I am taking the size of an arbitrary input data frame find the "true" number of partitions (the reason for the + 1 is Spark on longs and ints innately rounds down so 0 partitions would be impossible.

Hope this helps!

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.util.SizeEstimator 

val inputDF2 : Long = SizeEstimator.estimate(inputDF.rdd) 
//find its appropiate number of partitions 
val numPartitions : Long = (inputDF2/134217728) + 1 
//write it out with that many partitions  
val outputDF = inputDF.repartition(numPartitions.toInt) 
afeldman
  • 492
  • 2
  • 10