Is this somehow related to?
- Why are Spark Parquet files for an aggregate larger than the original?
- Why does the repartition() method increase file size on disk?
- Why cannot reduce the total file size by merging small ORC files?
I have a folder containing 3.3GB of orc files verified by du -sh
. All files have the same schema and are very small (~128 KB). I believe they have a lot of duplicates, so I'd like to clean them up.
$ du -sh /pool/in
3.9G /pool/in
I read this entire folder using this code.
# script.py
import pyspark.sql
import pyspark.sql.functions
instance_conf = pyspark.SparkConf()
instance_conf.set('spark.local.dir', '/pool/spark_temp')
instance_conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark = pyspark.sql.SparkSession.builder.config(conf=instance_conf).getOrCreate() # create a new sesssion at a host
dirname = "/pool/in"
dirname_out = "/pool/out"
dataframe = spark.read.orc(dirname)
dataframe_out = dataframe.dropDuplicates() # across all columns
dataframe_out.count()
dataframe_out.write.mode("overwrite").orc(dirname_out)
I called spark-submit using these settings
/usr/local/spark-3.4.1-bin-hadoop3/bin/spark-submit --driver-memory 4g --executor-memory 2g --executor-cores 2 script.py
And the output folder is now 12GB!
$ du -sh /pool/out
12G /pool/out
I was not expecting the dropDuplicates() operation to actually triple some large set of data. Are the executor cores reading the data in twice? Why is this happening?
I noticed something was off when even /pool/out/_temporary
was growing in size.
Just to add more to the confusion...
The amount of files left in each directory differs
ls /pool/in | wc -l
10626
ls /pool/out | wc -l
201
The deduplication did work since each id is it's own group of data.
>>> x = spark.read.orc("/pool/out")
>>> x.count()
401149568
>>> x = spark.read.orc("/pool/in")
>>> x.count()
467271623
>>> x = spark.read.orc("/pool/out")
>>> x.select("id").distinct().count()
21174972
>>> x = spark.read.orc("/pool/in")
>>> x.select("id").distinct().count()
21174972