25

I am trying to write out a large partitioned dataset to disk with Spark and the partitionBy algorithm is struggling with both of the approaches I've tried.

The partitions are heavily skewed - some of the partitions are massive and others are tiny.

Problem #1:

When I use repartition before partitionBy, Spark writes all partitions as a single file, even the huge ones

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

This takes forever to execute because Spark isn't writing the big partitions in parallel. If one of the partitions has 1TB of data, Spark will try to write the entire 1TB of data as a single file.

Problem #2:

When I don't use repartition, Spark writes out way too many files.

This code will write out an insane number of files.

df.write.partitionBy("some_col").parquet("partitioned_lake")

I ran this on a tiny 8 GB data subset and Spark wrote out 85,000+ files!

When I tried running this on a production data set, one partition that has 1.3 GB of data was written out as 3,100 files.

What I'd like

I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.

What is my best path forward?

Etienne Neveu
  • 12,604
  • 9
  • 36
  • 59
Powers
  • 18,150
  • 10
  • 103
  • 108

4 Answers4

27

The simplest solution is to add one or more columns to repartition and explicitly set the number of partitions.

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

where:

  • numPartitions - should be an upper bound (actual number can be lower) of the desired number of files written to a partition directory.
  • $"some_other_col" (and optional additional columns) should have high cardinality and be independent of the $"some_column (there should be functional dependency between these two, and shouldn't be highly correlated).

    If data doesn't contain such column you can use o.a.s.sql.functions.rand.

    import org.apache.spark.sql.functions.rand
    
    df.repartition(numPartitions, $"some_col", rand)
      .write.partitionBy("some_col")
      .parquet("partitioned_lake")
    
10465355
  • 4,481
  • 2
  • 20
  • 44
  • 2
    I have exact same situation, so I went with salting and I want to have not more than 5 files per partition. But what confuses me that I see 5 tasks at saving Stage, and that what I don't quite understand I thought that data should be balanced but also it should be organized in the way that it utilize cluster resources the best – jk1 Apr 18 '19 at 16:31
  • 1
    @10465355 can you expand on "there should be functional dependency between these two"? Thanks! – Joffer Dec 19 '21 at 18:26
  • In light of rand's being an acceptable substitute, I should add. – Joffer Dec 19 '21 at 18:33
15

I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.

The currently accepted answer is probably good enough most of the time, but doesn't quite deliver on the request that the 0.3 GB partition get written out to a single file. Instead, it will write out numPartitions files for every output partition directory, including the 0.3 GB partition.

What you're looking for is a way to dynamically scale the number of output files by the size of the data partition. To do that, we'll build on 10465355's approach of using rand() to control the behavior of repartition(), and scale the range of rand() based on the number of files we want for that partition.

It's difficult to control partitioning behavior by output file size, so instead we'll control it using the approximate number of rows we want per output file.

I'll provide a demonstration in Python, but the approach is basically the same in Scala.

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

spark = SparkSession.builder.getOrCreate()
skewed_data = (
    spark.createDataFrame(
        [(1,)] * 100 + [(2,)] * 10 + [(3,), (4,), (5,)],
        schema=['id'],
    )
)
partition_by_columns = ['id']
desired_rows_per_output_file = 10

partition_count = skewed_data.groupBy(partition_by_columns).count()

partition_balanced_data = (
    skewed_data
    .join(partition_count, on=partition_by_columns)
    .withColumn(
        'repartition_seed',
        (
            rand() * partition_count['count'] / desired_rows_per_output_file
        ).cast('int')
    )
    .repartition(*partition_by_columns, 'repartition_seed')
)

This approach will balance the size of the output files, no matter how skewed the partition sizes are. Every data partition will get the number of files it needs so that each output file has roughly the requested number of rows.

A prerequisite of this approach is calculating the size of each partition, which you can see in partition_count. It's unavoidable if you really want to dynamically scale the number of output files per partition.

To demonstrate this is doing the right thing, let's inspect the partition contents:

from pyspark.sql.functions import spark_partition_id

(
    skewed_data
    .groupBy('id')
    .count()
    .orderBy('id')
    .show()
)

(
    partition_balanced_data
    .select(
        *partition_by_columns,
        spark_partition_id().alias('partition_id'),
    )
    .groupBy(*partition_by_columns, 'partition_id')
    .count()
    .orderBy(*partition_by_columns, 'partition_id')
    .show(30)
)

Here's what the output looks like:

+---+-----+
| id|count|
+---+-----+
|  1|  100|
|  2|   10|
|  3|    1|
|  4|    1|
|  5|    1|
+---+-----+

+---+------------+-----+
| id|partition_id|count|
+---+------------+-----+
|  1|           7|    9|
|  1|          49|    6|
|  1|          53|   14|
|  1|         117|   12|
|  1|         126|   10|
|  1|         136|   11|
|  1|         147|   15|
|  1|         161|    7|
|  1|         177|    7|
|  1|         181|    9|
|  2|          85|   10|
|  3|          76|    1|
|  4|         197|    1|
|  5|          10|    1|
+---+------------+-----+

As desired, each output file has roughly 10 rows. id=1 gets 10 partitions, id=2 gets 1 partition, and id={3,4,5} each get 1 partition.

This solution balances the output file sizes, regardless of data skew, and without limiting parallelism by relying on maxRecordsPerFile.

Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
  • When you use `rand() * partition_count['count'] / desired_rows_per_output_file` - if there are a large number rows per partition, will this not result in output file rows much greater than desired_rows_per_output_file? e.g. if rand generates 0.9 and the partition for an id has 20000 entries, then 0.9*20000/10 = 1800 rows for that partition? Thanks – Mike91 Feb 20 '22 at 12:15
  • 1
    @Mike91 - Nope. The calculation is to generate a `repartition_seed`, a random number within a range that is scaled to the size of the partition. So if there are 20,000 rows in a partition, and you actually want 10 rows per output file, then the `repartition_seed` calculation will generate an even distribution of integers from 0 to 2,000 (i.e. 20,000 / 10). If 20,000 rows are being distributed evenly across 2,000 files, then you'll get 10 rows per file as desired. – Nick Chammas Feb 28 '22 at 17:14
  • Ah, makes sense! Thanks for the reply. – Mike91 Mar 01 '22 at 19:39
  • Are the partition sizes retained when doing `.partitionBy` during the write job? I am trying to achieve 10k records per partition/file but after the write job the partitions seems to get merged. – Ignaski Jun 01 '23 at 07:10
  • @Ignaski - Post a new question demonstrating the problem on 10 or 100 rows and it should be easy to spot the problem (if any). You can use `skewed_data` from my example as a starting point. – Nick Chammas Jun 02 '23 at 18:25
  • This is a clever approach! I think it has a downside in that there can be large skew in the lower range of `repartition_seed`. For example, let's say you have 1000 partitions, each with fewer records than `desired_rows_per_output_file`; they'll all end up with `repartition_seed == 0` and therefore in the same in-memory Spark partition, which could be a problem. A small tweak is to add a partition-specific offset to `repartition_seed`. I'll post that as a separate answer. – santon Jul 03 '23 at 18:47
9

An alternative to Nick Chammas' method is to create a row_number() column partitioned by the primary partition key and then floor divide it by the exact number of records that you want to appear in each partition. Expressed in SPARK SQL it looks like the following:

SELECT /*+ REPARTITION(id, file_num) */
  id,
  FLOOR(ROW_NUMBER() OVER(PARTITION BY id ORDER BY NULL) / rows_per_file) AS file_num
FROM skewed_data

The added benefit of this is that it allows you to colocate the majority of the data in one partition across files by using the ORDER BY clause on a secondary key. Secondary keys are not guaranteed to be colocated if the row numbers associated with a secondary key span across two file_num values. It is also possible, and in fact somewhat likely, to end up with one file with few records in each partition.

jofrepp
  • 129
  • 1
  • 4
  • 2
    The biggest benefit is not the colocation though, as you can't guarantee where the boundaries occur. The biggest benefit is that this approach uses one stage less than Nick Chammas' approach and so is more performant. Adding to the performance gain is the lack of the join, which means less equality checks. – Oliver W. Apr 23 '21 at 22:13
0

Nick Chammas has posted a clever approach, which should work in many cases. However, it has the disadvantage that it may not scale if there are many partitions. For example, if you have 100 data partitions, then a portion of each data partition will end up in the in-memory Spark partition (i.e., the one handling where repartition_seed == 0). If you have many data partitions, this can exceed the memory of the Spark worker.

This solution is a bit more complicated, but it will ensure that different data partitions are not shuffled to the same in-memory Spark partitions.

partition_col = "partition"
num_rows_in_output = 100_000

# Compute the number of records in each partition.
# Then, convert that into a number of desired output files:
partition_counts = (
    df
    .groupby(partition_col).count()
    .withColumn("num_files", F.ceil(col("count") / num_rows_in_output))
    # Cumulative sum to use as an offset:
    .withColumn(
        "file_offset",
        F.sum("num_files").over(Window.rowsBetween(Window.unboundedPreceding, -1)),
    )
    .na.fill(0, "file_offset")
    .cache()
)

# Use the custom partitioning to write the files:
(
    df
    .join(partition_counts, on=partition_col)
    .withColumn(
        'repartition_seed', 
        F.floor(F.rand() * F.col("num_files")) + F.col("file_offset")
    )
    .repartition("repartition_seed")
    .write
    .partitionBy(partition_counts)
    .parquet("/path/to/output")
)

There's still a problem, though. The .repartition(partition_col) will hash the repartition_seed and distribute based on the hash value. What we really want would be to use the repartition_seed value directly. The problem is that two different repartition_seed can hash to the same value and end up in the same in-memory Spark partition. At best, the data come from two different data partitions and are still written as two files of the right size. At worst, though, they come from the same data partition and end up creating a file that's 2x what you want. (It's possible for 3 more of these hash collisions to occur, giving you files that are 3x larger, and so on.)

The fix, it turns out, is to temporarily use an RDD, which does allow you to specify the exact partitioning. I've tested the following on a 500-million-record dataset with skewed partitions that I'm working with and the approach works well.

Here's the entire function:

import pyspark.sql.functions as F
from pyspark.sql.window import Window


def repartition_within_partition(
    df: "pyspark.sql.dataframe.DataFrame",
    partition_col,
    records_per_partition: int,
) -> "pyspark.sql.dataframe.DataFrame":
    """Repartition data such that files are the same size, even across partitions.

    :param df: The DataFrame to repartition, partition, and then write.
    :param partition_col: The column(s) on which to partition.
    :param records_per_partition: The approximate number of records to store in each file.
    :return: A DataFrame that's ready to be written.

    Examples:
        >>> (
        ...     repartition_within_partition(df, "partition", 100_000)
        ...     .write.partitionBy("partition").parquet("/path/to/directory")
        ... )
    """
    # The record count per partition, plus the fields we need to compute the partitioning:
    partition_counts = (
        df.groupby(partition_col)
        .count()
        # The number of files to write for this partition:
        .withColumn("num_files", F.ceil(F.col("count") / records_per_partition))
        # The file offset is the cumulative sum of the number of files:
        .withColumn(
            "file_offset",
            F.sum("num_files").over(Window.rowsBetween(Window.unboundedPreceding, -1)),
        )
        .na.fill(0, "file_offset")
        .cache()
    )

    num_partitions = partition_counts.agg(F.sum("num_files")).collect()[0][0]

    return (
        df.join(partition_counts, on=partition_col)
        .withColumn(
            "partition_index", F.floor(F.rand() * F.col("num_files")) + F.col("file_offset")
        )
        # The DataFrame API doesn't let you explicitly set the partition key; only RDDs do.
        # So we convert to an RDD, repartition according to the partition index, then convert back.
        .rdd.map(lambda r: (int(r["partition_index"]), r))
        .partitionBy(num_partitions)
        .map(lambda r: r[1])
        .toDF()
        .drop("count", "num_files", "file_offset", "partition_index")
    )


santon
  • 4,395
  • 1
  • 24
  • 43