2

I'm using pyspark to process some data and write the output to S3. I have created a table in athena which will be used to query this data.

Data is in the form of json strings (one per line) and spark code reads the file, partition it based on certain fields and write to S3.

For a 1.1 GB file, I see that spark is writing 36 files with 5 MB approx per file size. when reading athena documentation I see that optimal file size is ~128 MB . https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/

sparkSess = SparkSession.builder\
    .appName("testApp")\
    .config("spark.debug.maxToStringFields", "1000")\
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")\
    .getOrCreate()

sparkCtx = sparkSess.sparkContext
deltaRdd = sparkCtx.textFile(filePath)
df = sparkSess.createDataFrame(deltaRdd, schema)

try:
    df.write.partitionBy('field1','field2','field3')\
        .json(path, mode='overwrite', compression=compression)
except Exception as e:
    print (e)

why spark is writing such smaller files. Is there any way to control file size.

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Kapil
  • 9,469
  • 10
  • 40
  • 53
  • Can you post your pyspark code along with source for your pyspark job? – Prabhakar Reddy Dec 16 '19 at 08:23
  • Does this answer your question? [How do you control the size of the output file?](https://stackoverflow.com/questions/39187622/how-do-you-control-the-size-of-the-output-file) – blackbishop Dec 16 '19 at 13:28
  • No, it doesn't. There are mixed answer to this question. some says using fs.s3a.multipart.size /fs.s3a.block.size property this size can be controlled but it does not work or I'm missing something. some answer says this can not be controlled like the one posted above . its confusing . – Kapil Dec 17 '19 at 07:22
  • @Kapil did you ever resolve this issue? What was your solution at the end? I am experiencing exactly the same behavior – cool May 17 '21 at 12:09
  • 1
    @cool It was more of a work around. I repartitioned the data and limiting the file size. I know the average record size. based on that, I'm setting the maxRecordsPerFile to keep the file size growing infinitely. – Kapil May 18 '21 at 13:24
  • @Kapil Thank you for following on that. So, repartitioning data and maxRecordsPerFile from the pyspark app (python code) was enough to achieve this. You did not have to deal with fs multipart size configurations? – cool May 19 '21 at 00:07
  • 1
    @cool Yes. that was enough to limit the no of files. Now a new file is only created if no of records are more than specified. I have created a gist with that code . see if it helps https://gist.github.com/kapilgarg/ed605408aee21166ba6394a483e1f260 – Kapil May 19 '21 at 05:24
  • 1
    @Kapil thanks again, this will help. Maybe it would be good for you to elaborate this discussion in an answer by yourself and mark it as accepted, just for the sake of resolution. – cool May 19 '21 at 09:07

1 Answers1

1

Is there any way to control file size?

There are some control mechanism. However they are not explicit.

The s3 drivers are not part of spark itself. They are part of the hadoop installation which ships with spark emr. The s3 block size can be set within /etc/hadoop/core-site.xml config file.

However by default it should be around 128 mb.

why spark is writing such smaller files

Spark will adhere to the hadoop block size. However you can use partionBy before writing.

Lets say you use partionBy("date").write.csv("s3://products/"). Spark will create a subfolder with the date for each partition. Within each partioned folder spark will again try to create chunks and try to adhere to the fs.s3a.block.size.

e.g

s3:/products/date=20191127/00000.csv
s3:/products/date=20191127/00001.csv
s3:/products/date=20200101/00000.csv

In the example above - a particular partition can just be smaller than a blocksize of 128mb.

So just double check your block size in /etc/hadoop/core-site.xml and wether you need to partition the data frame with partitionBy before writing.

Edit:

Similar post also suggests to repartition the dataframe to match the partitionBy scheme

df.repartition('field1','field2','field3')
.write.partitionBy('field1','field2','field3')

writer.partitionBy operates on the existing dataframe partitions. It will not repartition the original dataframe. Hence if the overall dataframe is paritioned differently, there is nested partitioning happening.

dre-hh
  • 7,840
  • 2
  • 33
  • 44
  • even though, complete data corresponds to a single partition, file size is still not 128. I don't have any entry in core-site.xml so assuming it is taking default size but after updating the property for multipart upload , no change – Kapil Dec 16 '19 at 11:14
  • did you check the hadoop s3 conf? what is the s3 block.size? – dre-hh Dec 16 '19 at 11:21
  • when i check spark-emr hadoop conf, there is no setting for s3 block.size at all. the files we write come out around 150 mb. Do a `cd /etc/hadoop/conf/` then run `grep -rni "s3" *` to check wether there is `block.size` setting – dre-hh Dec 17 '19 at 10:06
  • Just also saw, we repartition the dataframe to match the datawriter schema. added another hint to the answer – dre-hh Dec 17 '19 at 10:11