8

I am attempting to use Spark for a very simple use case: given a large set of files (90k) with device time-series data for millions of devices group all of the time-series reads for a given device into a single set of files (partition). For now let’s say we are targeting 100 partitions, and it is not critical that a given devices data shows up in the same output file, just the same partition.

Given this problem we’ve come up with two ways to do this - repartition then write or write with partitionBy applied to the Writer. The code for either of these is very simple:

repartition (hash column is added to ensure that comparison to partitionBy code below is one-to-one):


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .repartition("partition") \
  .write.format("json") \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

partitionBy:


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .write.format("json") \
  .partitionBy(“partition”) \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

In our testing repartition is 10x faster than partitionBy. Why is this?

Based on my understanding repartition will incur a shuffle which my Spark learnings have told me to try to avoid whenever possible. On the other hand, partitionBy (based on my understanding) only incurs an sort operation local to each node - no shuffle is needed. Am I misunderstanding something that is causing me to think partitionBy would be faster?

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
Robin Zimmerman
  • 593
  • 1
  • 6
  • 17

2 Answers2

12

TLDR: Spark triggers a sort when you call partitionBy, and not a hash re-partitioning. This is why it is much slower in your case.

We can check that with a toy example:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")

DAG 1

Don't pay attention to the grey stage, it is skipped because it was computed in a previous job.

Then, with partitionBy:

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")

DAG2

You can check that you can add repartition before partitionBy, the sort will still be there. So what's happening? Notice that the sort in the second DAG does not trigger a shuffle. It is a map partition. In fact, when you call partitionBy, spark does not shuffle the data as one would expect at first. Spark sorts each partition individually and then each executor writes his data in the according partition, in a separate file. Therefore, note that with partitionBy you are not writing num_partitions files but something between num_partitions and num_partitions * num_executors files. Each partition has one file per executor containing data belonging to that partition.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • @Oli It is still strange to me that a per-executor sort is so much slower than the shuffle associated with the hash repartitioning. Any thoughts on that and any ideas of how to make it faster? – Robin Zimmerman Nov 15 '21 at 14:47
  • I was thinking about it when writing the answer and it is true that this is strange. You are saying that you have 90k files. How many records per file? or in total? – Oli Nov 15 '21 at 15:16
  • Yes, 90k files. Median file size is 5MB with 4267 records. I tried doing `coalesce` before `partitionBy` and that did improve performance, but still ~3x slower that `repartition`. – Robin Zimmerman Nov 15 '21 at 16:26
  • Is spark running in local or on a cluster? – Oli Nov 15 '21 at 17:46
  • 12 node cluster. – Robin Zimmerman Nov 15 '21 at 17:51
  • And last question, how much time approximately do both job take? – Oli Nov 16 '21 at 17:28
  • around 35 minutes for the `repartition` version, 300 minutes for `partitionBy` (and I looked again and I think I was mistaken about the `coalesce` helping performance. Not sure it did anything.) – Robin Zimmerman Nov 16 '21 at 19:49
  • OK, I know what's happening. It is simply some (huge) file creation overhead. You have 90k files so when spark reads them, you don't have 90k partitions but probably something like 10k (it was the case in my test). When you use `partitionBy`, that creates one directory per value of the column, containing one file per partition containing that value. In your case, that could be up to 1M files compared to 100 files with `repartition`. Note also that spark writes in two phases (`_temporary` file first and then the definitive files). That multiplies the overhead by 2. – Oli Nov 17 '21 at 17:08
  • You may notice in spark history that spark stops working way earlier than the actual end of the job. Hadoop is then just copying files around. And that part does not seemed well optimized. Using `coalesce` or `repartition` before hand would definitely improve the performance. – Oli Nov 17 '21 at 17:10
  • thanks for your suggestion, but I don't believe the above is correct. As you can see from my code I added a hash column called "partition" that should only have 100 distinct values: `hash(col("_DeviceName")).cast("Long") % num_partitions`. Then I call `partitionBy` on this column, not the original. – Robin Zimmerman Nov 18 '21 at 23:05
10

I think @Oli has explained the issue perfectly in his comments to the main answer. I just want to add my 2 cents and try to explain the same.

Let's say when you are reading the XML files [90K files], spark reads it into N partitions. This is decided based on the number of factors like spark.sql.files.maxPartitionBytes, file format, compression type etc.

Let's assume it to be 10K partitions. This is happening in the below part of the code.

df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \

Assuming you are using num_partitions = 100, you are adding a new column called partition with values 0-99. Spark is just adding a new column to the existing dataframe [or rdd] which is split across the 10K partitions.

.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \

Till this point, both the codes are the same.

Now, let's compare what is happening with repartition v/s partitionBy

Case 1: repartition

.repartition("partition") \
.write.format("json") \

Here, you are repartitioning the existing dataframe based on the column "partition" which has 100 distinct values. So the existing dataframe will incur a full shuffle bringing down the number of partitions from 10K to 100. This stage will be compute-heavy since a full shuffle is involved. This could also fail if the size of one particular partition is really huge [skewed partition].

But the advantage here is that in the next stage where write happens, Spark has to write only 100 files to the output_path. Each file will only have data corresponding to only one value of column "partition"

Case 2: partitionBy

.write.format("json") \
.partitionBy("partition") \

Here, you are asking spark to write the existing dataframe into output_path partitioned by the distinct values of the column "partition". You are nowhere asking spark to reduce the existing partition count of the dataframe.

So spark will create new folders inside the output_path and write data corresponding to each partitions inside it.

output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"

Now since you have 10K spark partitions on the existing data frame and assuming the worst case where each of these 10K partitions has all the distinct values of the column "partition", Spark will have to write 10K * 100 = 1M files. ie, some part of all the 10K partitions will be written to all of the 100 folders created by the column "partition". This way spark will be writing 1M files to the output_path by creating sub-directories inside. The advantage is that we are skipping a full-shuffle using this method.

Now compared to the in-memory compute-intensive shuffle in Case 1, this will be much slower since Spark has to create 1M files and write them to persistent storage. That too, initially to a temporary folder and then to the output_path.

This will be much more slower if the write is happening to an object-store like AWS S3 or GCP Blob

Case 3: coalesce + partitionBy

.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \

In this case, you will be reducing the number of spark partitions from 10K to 100 with coalesce() and writing it to output_path partitioned by column "partition".

So, assuming the worst case where each of these 100 partitions has all the distinct values of the column "partition", spark will have to write 100 * 100 = 10K files.

This will still be faster than Case 2, but will be slower than Case 1. This is because you are doing a partial shuffle with coalesce() but still end up writing 10K files to output_path.

Case 4: repartition+ partitionBy

.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \

In this case, you will be reducing the number of spark partitions from 10K to 100 [distinct values of column "partition"] with repartition() and writing it to output_path partitioned by column "partition".

So, each of these 100 partitions has only one distinct value of the column "partition", spark will have to write 100 * 1 = 100 files. Each sub-folder created by partitionBy() will only have 1 file inside it.

This will take the same time as Case 1 since both the cases involve a full-shuffle and then writing 100 files. The only difference here will be that 100 files will be inside sub-folders under the output_path.

This setup will be useful for predicate push-down of filters while reading the output_path via spark or hive.

Conclusion:

Even though partitionBy is faster than repartition, depending on the number of dataframe partitions and distribution of data inside those partitions, just using partitionBy alone might end up costly.

Rinaz Belhaj
  • 745
  • 1
  • 7
  • 20
  • Marking this as accepted answer as I think it better defines the true reason why partitionBy is slower. Oli's answer was helpful but I didn't get the satisfactory feeling I was looking for like I did after reading this. – Robin Zimmerman Dec 02 '21 at 23:32