71

What is the difference between DataFrame repartition() and DataFrameWriter partitionBy() methods?

I hope both are used to "partition data based on dataframe column"? Or is there any difference?

Braiam
  • 1
  • 11
  • 47
  • 78
Shankar
  • 8,529
  • 26
  • 90
  • 159
  • 1
    For anyone coming to this question, [this one](https://stackoverflow.com/q/48356425/3679900) might also be relevant – y2k-shubham Aug 25 '19 at 07:06

3 Answers3

232

Watch out: I believe the accepted answer is not quite right! I'm glad you ask this question, because the behavior of these similarly-named functions differs in important and unexpected ways that are not well documented in the official spark documentation.

The first part of the accepted answer is correct: calling df.repartition(COL, numPartitions=k) will create a dataframe with k partitions using a hash-based partitioner. COL here defines the partitioning key--it can be a single column or a list of columns. The hash-based partitioner takes each input row's partition key, hashes it into a space of k partitions via something like partition = hash(partitionKey) % k. This guarantees that all rows with the same partition key end up in the same partition. However, rows from multiple partition keys can also end up in the same partition (when a hash collision between the partition keys occurs) and some partitions might be empty.

In summary, the unintuitive aspects of df.repartition(COL, numPartitions=k) are that

  • partitions will not strictly segregate partition keys
  • some of your k partitions may be empty, whereas others may contain rows from multiple partition keys

The behavior of df.write.partitionBy is quite different, in a way that many users won't expect. Let's say that you want your output files to be date-partitioned, and your data spans over 7 days. Let's also assume that df has 10 partitions to begin with. When you run df.write.partitionBy('day'), how many output files should you expect? The answer is 'it depends'. If each partition of your starting partitions in df contains data from each day, then the answer is 70. If each of your starting partitions in df contains data from exactly one day, then the answer is 10.

How can we explain this behavior? When you run df.write, each of the original partitions in df is written independently. That is, each of your original 10 partitions is sub-partitioned separately on the 'day' column, and a separate file is written for each sub-partition.

I find this behavior rather annoying and wish there were a way to do a global repartitioning when writing dataframes.

conradlee
  • 12,985
  • 17
  • 57
  • 93
  • 5
    +1. Just to make the nice example of partitionBy more explicit, you can imagine it as a : group by partition, col1, col2...That will give you how many files will be written. – Farah May 07 '18 at 08:06
  • 8
    Wonderful answer and +50 for the "differs in important and unexpected ways that are not well documented in the official spark documentation." My question: is there a way to hack what you describe in the last sentence? Something like `df.write().repartition(COL).partitionBy(COL)`? What I'm aiming for is the `partitionBy()` behavior, but with roughly the same file size and number of files as I had originally. Can this be easily accomplished? The `partitionBy(date)` => 70 files example is relevant. I would want ~10 files, one for each day, and maybe 2 or 3 for days that have more data. – seth127 Jun 05 '18 at 14:37
  • 2
    @seth127 - I have some ideas, but they'll take a bit of space to explain. Write your question up as an official question and I'll write you an answer. – conradlee Jun 07 '18 at 07:44
  • 2
    @conradlee ok, here it is: https://stackoverflow.com/questions/50775870/pyspark-efficiently-have-partitionby-write-to-same-number-of-total-partitions-a Thanks in advance! – seth127 Jun 09 '18 at 15:35
  • @conradlee partitions will not strictly segregate partition keys. I don't get your point as the hashing guarantees that the same hashing value gives same result, as you state as well, so what am I missing? – thebluephantom Aug 22 '18 at 09:53
  • 3
    @thebluephantom let's say you've got 1000 days worth of data and you want to partition on your date column. so you run `df.repartition(df.date, 1000)`. Many people expect each partition to contain exactly one day of data. However, some of the 1000 partitions will be empty, and other partitions will contain multiple days worth of data. Many people find that unintuitive (maybe you don't and hence the confusion). – conradlee Aug 22 '18 at 11:07
  • OK gotcha, then as we were. Some formats offer that on write if memory serves well – thebluephantom Aug 22 '18 at 11:10
  • Has anyone delved into what realistically is the possibility of hash collision to occur? I understand - when working with Spark many times we deal with real Big Data - billions and billions of records. However I would expect collisions to happen very rarely even in this case. – mmierins Dec 08 '18 at 21:03
  • @mmierins In this setting, 'partition collisions' are very common because the partition assignment function is something like `hash(partition_key) % N_PARTITIONS`. So the range of the hash algorithm is collapsed down into space of `N_PARTITIONS`. Given that `N_PARTITIONS` is often in the range of a few hundred to a few ten thousand, collisions can be common. – conradlee Dec 10 '18 at 08:11
  • [This is also a very nice description](https://kontext.tech/column/spark/296/data-partitioning-in-spark-pyspark-in-depth-walkthrough). – Mike Williamson Aug 28 '20 at 14:23
  • 1
    I think an important thing to remember here is that spark is not focused for datasets containing 70 records, but instead millions and much more. So if your partitioning column in well distributed, the partitions will be approximately equal sizes. – Eva Lond Sep 08 '21 at 07:49
  • @Ingvar Lond: This will be true if you repartition without specifying any column to partition on, e.g., `df.repartition()`. However, once you specify a column, the distribution of rows to partitions depends on how many rows have each column value. Increasing dataset size won't always overcome data skew issues. Sometimes it will, but it totally depends on the idiosyncrasies of your dataset. – conradlee Sep 08 '21 at 09:39
53

If you run repartition(COL) you change the partitioning during calculations - you will get spark.sql.shuffle.partitions (default: 200) partitions. If you then call .write you will get one directory with many files.

If you run .write.partitionBy(COL) then as the result you will get as many directories as unique values in COL. This speeds up futher data reading (if you filter by partitioning column) and saves some space on storage (partitioning column is removed from data files).

UPDATE: See @conradlee's answer. He explains in details not only how the directories structure will look like after applying different methods but also what will be resulting number of files in both scenarios.

Mariusz
  • 13,481
  • 3
  • 60
  • 64
28

repartition() is used to partition data in memory and partitionBy is used to partition data on disk. They're often used in conjunction.

Both repartition() and partitionBy can be used to "partition data based on dataframe column", but repartition() partitions the data in memory and partitionBy partitions the data on disk.

repartition()

Let's play around with some code to better understand partitioning. Suppose you have the following CSV data.

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

df.repartition(col("country")) will repartition the data by country in memory.

Let's write out the data so we can inspect the contents of each memory partition.

val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
  .write
  .csv(outputPath)

Here's how the data is written out on disk:

partitioned_by_country/
  part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv

Each file contains data for a single country - the part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv file contains this China data for example:

Bruce,Lee,China
Jack,Ma,China

partitionBy()

Let's write out data to disk with partitionBy and see how the filesystem output differs.

Here's the code to write out the data to disk partitions.

val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
  .write
  .partitionBy("country")
  .csv(outputPath)

Here's what the data looks like on disk:

partitionedBy_disk/
  country=Argentina/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
  country=China/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
  country=Russia/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000

Why partition data on disk?

Partitioning data on disk can make certain queries run much faster.

Machavity
  • 30,841
  • 27
  • 92
  • 100
Powers
  • 18,150
  • 10
  • 103
  • 108