0

I work inside a pyspark docker container.

I'm trying to read a parquet file from the local filesystem into spark splitting it into several partitions at the same time:

  1. I download the NY Taxi dataset, which is 46Mb in size
curl -Lo taxi.parquet 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
  1. I start a spark session with the following configuration (max partition size is 10Mb):
spark = (
  SparkSession
    .builder
    .appName("Bucketing")
    .master("local[4]")
    .config("spark.sql.files.maxPartitionBytes", 10 * 1000 * 1024)
    .getOrCreate()
)
  1. I read the parquet file into a DataFrame
taxi_df = spark.read.parquet("taxi.parquet")
  1. I trigger an action:
taxi_df.show()
  1. When I look into Spark UI, I see that there are 4 partitions, but all the data went to a single partition, the other three partitions are empty (skewed dataset): skewed dataframe

According to the documentation the spark.sql.files.maxPartitionBytes should take effect when reading files, but it apparently leads to skewed partitions.

Question: Is there a way to configure spark to avoid skewed partitions?

Additional:

I tried two more parquet files: one is 36Mb, the other one is 113Mb, and each time the correct number of partitions were created (3 and 13 accordingly), but all the data went to a single partitions while the others left empty, so I assume it's not related to the local[4] config of my spark session

neshkeev
  • 6,280
  • 3
  • 26
  • 47

1 Answers1

1

In this case, you are not really suffering from data skew. The NY Taxi Dataset is a file that has not been partitioned by Spark before, so you're actually reading in 1 partition only.

To demonstrate this, you can start up a spark-shell with the following command:

spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=10485760"

Then, you can try the following:

scala> val df = spark.read.parquet("yellow_tripdata_2023-01.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]

scala> import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.functions.spark_partition_id

scala> df.groupBy(spark_partition_id).count
res1: org.apache.spark.sql.DataFrame = [SPARK_PARTITION_ID(): int, count: bigint]

scala> df.groupBy(spark_partition_id).count.show
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   1|3066766|
+--------------------+-------+

So it seems like there is only 1 partition!

Now, if you want to have this dataset partitioned on disk, you can .repartition it.

Doing the following command in a spark-shell:

scala> df.repartition(4).write.parquet("test.parquet")

Will write away a partitioned file. You can verify this in the command line:

$ tree yellow_tripdata_2023-01.parquet
yellow_tripdata_2023-01.parquet

$ tree test.parquet/
test.parquet/
├── part-00000-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00001-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00002-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00003-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
└── _SUCCESS

You see that test.parquet has been partitioned on disk as well. Finally, you'll see that this .write.parquet action will have properly balanced tasks too now:

enter image description here

What does spark.sql.files.maxPartitionBytes do then?

The spark.sql.files.maxPartitionBytes configuration exists to prevent processing too many partitions in case there are more partitions than cores in your cluster.

You can imagine that with each partition, there comes a bit of overhead. By specifying it to be equal to a certain value (128MB by default) you're telling Spark: "You can group my input data if the partitions are too small, but not larger than this limit". Again, this is only relevant if there are more partitions than cores in your cluster.

We can test this out with the code that we wrote above. We know we wrote our test.parquet file with 4 partitions of around 20MB each. Let's read this file in with a spark.files.maxPartitionBytes=52428800 (50MB). This should at least group 2 input partitions into a single one.

We're going to do this test with 2 cluster sizes. Once with 4 cores:

spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=52428800"

scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]

scala> df.groupBy(spark_partition_id).count.show
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|766691|
|                   3|766692|
|                   2|766691|
|                   0|766692|
+--------------------+------+

Here you see that the input partitions were not grouped together. This is because our cluster has 4 cores, so it's still more efficient to fully parallelize the partitions over the workers, even though their size is smaller than maxPartitionBytes.

And once with 2 cores:

spark-shell --master "local[2]" --conf "spark.files.maxPartitionBytes=52428800"

scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]

scala> df.groupBy(spark_partition_id).count.show
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   1|1533383|
|                   0|1533383|
+--------------------+-------+

Here the input partitions were grouped together indeed! This is because the total number of CPU in our cluster is smaller than the amount of partitions in the input data, so it's worth it to group them together in order to diminish overhead.

I've explained how this input splitting is handled in a bit more detail in this SO answer if you're interested in more info on this.

Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • That is one way to fix this, but I expected spark to partition the DataFrame while reading the file. Otherwise it's not clear went this config does. – neshkeev Apr 04 '23 at 08:19
  • I've added some extra info about the `maxPartitionBytes` parameter. Hopefully it helps in clearing this up! – Koedlt Apr 04 '23 at 12:27
  • Unfortunately, it doesn't on my computer (spark 3.3.1). I executed your commands exactly (in scala), but all the data still goes to a single partition (according to your `groupBy` query) regardless of the `maxPartitionBytes` value I set. I tried different files. Evidently, it's impossible to split a single parquet file into partitions while reading it. The setting works for `csv` and `json` though – neshkeev Apr 04 '23 at 22:41
  • Are you making sure to read the `test.parquet` file, which has been written after using `repartition(4)`? You can also verify that by checking the directory structure of that file (using `tree` like I did for example): it should contain 4 part-files and 1 `_SUCCESS` file. – Koedlt Apr 05 '23 at 04:09