8

I have 160GB of data,partition on DATE Column and storing in parquet file format running on spark 1.6.0. I need to store the output parquet files with equal sized files in each partition with fixed size say like 100MB each.

I tried with below code:

val blockSize= 1024*1024*100
sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize)
sc.hadoopConfiguration.setInt("parquet.block.size",blockSize)

df1.write.partitionBy("DATE").parquet("output_file_path")

The above configuration is not working, it is creating multiple files with default number of partitions,not the 100 MB file.

Vincent Doba
  • 4,343
  • 3
  • 22
  • 42
warner
  • 135
  • 2
  • 2
  • 7
  • 1
    As far as I know, that property is only an upper bound, not a fixed number. And Spark will always save each partition separately until you repartition the dataframe – OneCricketeer Apr 13 '18 at 23:33
  • Any other way, I have store equal sized file instead of using repartition, because my data is not equally partitioned. few partitions contain more data and others contains few mbs of data. – warner Apr 13 '18 at 23:42
  • Then you'll never get equally sized files anyway. I don't understand the purpose of this... In fact, HDFS and S3 even prefer larger files for extra processing – OneCricketeer Apr 13 '18 at 23:47
  • Possible duplicate. Maybe this is what you are looking for https://stackoverflow.com/questions/39187622/how-do-you-control-the-size-of-the-output-file/39282893 – Gladiator May 25 '18 at 04:23

2 Answers2

8

It's not possible to get the exact same size for every file, but you can give enough hints to Spark to make them "within" a certain size. The general goal is to make each file equal to the HDFS block size and each file holds one (or more) row group. You want the row group to fit in one HDFS block. If a row group does not fit in one block, you have a situation where additional network calls need to be made to read another HDFS block to completely read the row group.

To achieve this, do the following:

  • Set spark.sql.files.maxPartitionBytes in spark conf to 256 MB (equal to your HDFS block size)
  • Set parquet.block.size on the parquet writer options in Spark to 256 MB.

tradesDF.write.option("parquet.block.size", 256 * 1024 * 1024)

IceMan
  • 1,398
  • 16
  • 35
1

you can try the following approach:

first, you should estimate the size of a single row in your data.
it's difficult to do accurately (since the parquet file contains metadata as well), but you can take 1000 rows of your data, write to a file, and estimate the size of a single row

from that calculate how many rows will fit in a 100MB:

N = 100MB / size_of_row 

now you can create an additional column, with a bucket id for each row:

val df2 = df.withColumn("bucket", (rank.over(Window.partitionBy("DATE")) / N).cast(IntegerType))

and now you can repartition your data by both the date and the bucket:

df2
  .repartition($"DATE", $"bucket")
  .dropColumn("bucket")
  .write
  .parquet(...)
lev
  • 3,986
  • 4
  • 33
  • 46