32

In spark, what is the best way to control file size of the output file. For example, in log4j, we can specify max file size, after which the file rotates.

I am looking for similar solution for parquet file. Is there a max file size option available when writing a file?

I have few workarounds, but none is good. If I want to limit files to 64mb, then One option is to repartition the data and write to temp location. And then merge the files together using the file size in the temp location. But getting the correct file size is difficult.

Rob
  • 14,746
  • 28
  • 47
  • 65
user447359
  • 477
  • 1
  • 5
  • 13
  • 1
    Just curious to know what is the use case of same size in output files. – Sachin Jain Aug 28 '16 at 04:24
  • Trying to keep file size consistent. For example, when I write files in different partition, some partition files are 10 times bigger. df.repartition(35).write.mode(SaveMode.Overwrite).partitionBy(list:_*).parquet("tmp5") – user447359 Aug 29 '16 at 20:03
  • I have made a working solution (plug & play). Please see my answer below – Sarath Subramanian Dec 11 '22 at 06:07

6 Answers6

52

It's impossible for Spark to control the size of Parquet files, because the DataFrame in memory needs to be encoded and compressed before writing to disks. Before this process finishes, there is no way to estimate the actual file size on disk.

So my solution is:

  • Write the DataFrame to HDFS, df.write.parquet(path)
  • Get the directory size and calculate the number of files

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val dirSize = fs.getContentSummary(path).getLength
    val fileNum = dirSize/(512 * 1024 * 1024)  // let's say 512 MB per file
    
  • Read the directory and re-write to HDFS

    val df = sqlContext.read.parquet(path)
    df.coalesce(fileNum).write.parquet(another_path)
    

    Do NOT reuse the original df, otherwise it will trigger your job two times.

  • Delete the old directory and rename the new directory back

    fs.delete(new Path(path), true)
    fs.rename(new Path(newPath), new Path(path))
    

This solution has a drawback that it needs to write the data two times, which doubles disk IO, but for now this is the only solution.

soulmachine
  • 3,917
  • 4
  • 46
  • 56
  • Can I do something similar with Spark SQL? I want to control fileNum and don't quite care about file size for each file. – whatsnext May 29 '19 at 20:41
  • 4
    @soulmachine - could you elaborate on "Do NOT reuse the original df, otherwise it will trigger your job two times." – BI Dude Feb 19 '20 at 12:53
  • Is this still true? Or has there been development of 3rd party libraries that does this for us? – haneulkim Jun 07 '23 at 01:21
7

There isn't a roll-after-specific-size option in Spark yet, but the seconds best: roll after specific number of records.

Since Spark 2.2 it's possible to set maxRecordsPerFile.

See also https://stackoverflow.com/a/48143315/630269

selle
  • 868
  • 1
  • 10
  • 27
  • 2
    Do you know if there is anything like minRecordsPerFile? – Yeikel Oct 14 '19 at 19:14
  • 1
    Sorry no, I haven't seen that. Maybe you can create a question and describe your use case there? – selle Oct 15 '19 at 07:47
  • I believe the full name for this setting is spark.sql.files.maxRecordsPerFile ? https://spark.apache.org/docs/latest/configuration.html – linehrr Nov 09 '22 at 16:33
  • It depends on your context. With a spark-dataframe: `df.write.option("maxRecordsPerFile", 1000000)...` – selle Nov 10 '22 at 08:39
3

As others have mentioned you can't explicitly hit a target size per file. You can, however, get all your output files to have about the same number of rows. If you know on average what your compression ratio looks like, evenly distributing rows across output files up to a max_rows will get you consistent sizes of about your target.

This is easier said than done if you're doing a partitionBy before you write. Here's some pseudocode for how we do it:

-- #3 distribute partitionC's rows based on partitions plus random integer that pertains to file number
select * from dataframe_table as t4
inner join

    -- #2 calculate the number of output files per partition
    ((select t1.partitionA, t1.partitionB, cast(t2.partition_num_rows / max_rows as int) + 1 as partition_num_files from dataframe_table) as t1
        inner join 

        -- #1 determine number of rows in output partition
        (select partitionA, partitionB, count(*) as partition_num_rows from dataframe_table group by (partitionA, partitionB)) as t2
        on t1.partitionA = t2.partitionA and t1.partitionB = t2.partitionB) as t3

on t3.partitionA = t4.partitionA and t3.partitionB=t4.partitionB
distribute by (t4.partitionA, t4.partitionC, floor(rand() * t3.partition_num_files)) sort by (partitionC, sortfield)

I included a sort on the partition here because in our use-case this drastically improves compression while only minimally impacting performance.

And if your results from step 1 and 2 are sufficiently small Spark may be able to broadcast join them to speed them up.

MrChrisRodriguez
  • 493
  • 6
  • 12
2

Ok here's my perfected method when taking into account target file size, memory usage and execution time.. These files also includes snappy compression and dictionary encoding.

My HDFS Blocksize is 128 megs (128 * 1024 * 1024):

<property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
</property>

Here are my final parquet files which are all super close the the hdfs block size.

133916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
133459404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
133668445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
134004329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
134015650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
132053162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
132917851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
122594040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet

This is how I did this..

A. Come up with a rough number of rows to generate a bunch of SMALL parquet files in the range of 10 megs or so. In my case I chose 200,000 records. Lots of smaller parquet files are more space efficient than one large parquet file because dictionary encoding and other compression techniques gets abandoned if the data in a single file has more variety. Writing out roughly 10 megs at a time also releases memory.

Your files will look something like this:

07916650 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0001.parquet
12259404 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0002.parquet
11368445 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0003.parquet
07044329 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0004.parquet
13145650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0005.parquet
08534162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0006.parquet
12178451 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0007.parquet
11940440 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0008.parquet
09166540 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0009.parquet
12594044 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0010.parquet
11684245 2018-07-06 07:05 /year=2018/month=01/HoldingDetail_201801_0011.parquet
07043129 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0012.parquet
13153650 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0013.parquet
08533162 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0014.parquet
12137851 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0015.parquet
11943040 2018-07-06 07:06 /year=2018/month=01/HoldingDetail_201801_0016.parquet

B. Create a list of all your smaller parquet files with file sizes when added together do not exceed your HDFS block size. In the example above:

/year=2018/month=01/HoldingDetail_201801_0001.parquet
to
/year=2018/month=01/HoldingDetail_201801_0012.parquet
plus
/year=2018/month=01/HoldingDetail_201801_0014.parquet

Take up 133,408,651 bytes.

C. Open up a new file called HoldingDetail_201801_temp.parquet

Read all the smaller files in your list one at a time and write them to the temp file as parquet ROW GROUP. It is very important to write each file in as a row group which preserves compression encoding and guarantees the amount of bytes (minus schema metadata) written will be the same as the original file size.

Delete all the smaller files in the list. Rename temp file to HoldingDetail_201801_0001.parquet.

Repeat steps B and C for remaining smaller files to create *_0002.parquet, *_0003.parquet, *_0004.parquet, etc. which will be target files with sizes just under the hdfs block size.

(I also add a check that if the sum of file sizes > 0.95 * dfs.blocksize then just go ahead and merge the files found)

David Lee
  • 129
  • 1
  • 3
  • 2
    It's been a long time since you did this exercise, but I think readers would find it useful if you could share the Spark code that you used. :D In any case, good solution. – akki Apr 05 '21 at 07:51
0

Though answering late, I have made a working solution.

Concept
First all, you need to already have exported files in a directory (Azure Blob Container). These files will not have the logic by file size (in my case, I have used a temp folder). We need to read these files, apply the logic that I have given in the code below and regenerate the files once again with the expected file size you need.

Code logic
The 1st parameter to the method should be number of records in the files that we are going to read. 2nd parameter is the expected size per file in MB.

temp_path is where I have kept the exported csv files. target_path is where I will be exporting the files with file size logic. I then read the file size using dbutils and then find out how many records should be there in a file to fit the expected size per file in MB.

After that, I read the files in and store in a dataframe df_temp. In the last line, make sure you keep the repartition value to 1. The maxRecordsPerFile tells how many records should be there in a file to fit the expected size per file in MB (which we had calculated earlier).The reason why we are using maxRecordsPerFile is that, we doesn't know with how many records it can generate a file with a size that we are expecting. Using the method which I wrote below, it can determine this logic and we can export the file with these many number of records per file.

from functools import reduce
from operator import add
def split_files_by_size(total_records,file_size_in_mb): 
    temp_path = '/mnt/exported_files/temp'  
    target_path = '/mnt/exported_files'        
    csv_files = dbutils.fs.ls(temp_path)                             
    files_size = [file.size for file in csv_files if file.name.endswith('.csv')]             
    records_per_file = int(total_records/ ((reduce(add, files_size) / 1024 / 1024)/file_size_in_mb))
    df_temp = spark.read.option('header','true').option("delimiter",",").option("quoteAll", "true").option("quote", "\"").format("csv").load(temp_path)
    df_temp.repartition(1).write.option("maxRecordsPerFile", records_per_file).option('header','true').option("delimiter",",").option("quoteAll", "true").option("quote", "\"").mode("overwrite").csv(target_path)

To execute this, use the below example code.

split_files_by_size(50000,50)

I have the files generated like below. In my case, I had total 50k records with 87 MB as the actual file and I had to generate 2 files out of it - a 50 MB file and a 37 MB file.

enter image description here

In case you need two files with equal size, you can change the repartition value to 2.

df_temp.repartition(2).write.option("maxRecordsPerFile", records_per_file).option('header','true').option("delimiter",",").option("quoteAll", "true").option("quote", "\"").mode("overwrite").csv(target_path)

enter image description here

The drawback with this solution is that it has to write the files an extra time, but we don't have any other way as of now.

Sarath Subramanian
  • 20,027
  • 11
  • 82
  • 86
-3

Here is my solution, and it works fun for me.

val repartition_num = 20  
val hqc = new org.apache.spark.sql.hive.HiveContext(sc)
val t1 = hqc.sql("select * from customer")

// 20 parquet files will be generated in hdfs dir
// JUST control your file with partition number
t1.repartition(repartition_num ).saveAsParquetFile(parquet_dir)

And this is the result:

> hadoop fs -ls /tpch-parquet/customer/*.parquet  | wc -l
20
Jia
  • 1,483
  • 11
  • 10
  • 4
    -1. This doesn't answer the OP's question (control file size) but rather a completely different question (controlling the number of files) – synhershko Aug 01 '17 at 18:58
  • if you repartition to a larger number, it will shrink the average file size as well. wfm – James Mar 29 '18 at 18:30