In this case, you are not really suffering from data skew. The NY Taxi Dataset is a file that has not been partitioned by Spark before, so you're actually reading in 1 partition only.
To demonstrate this, you can start up a spark-shell
with the following command:
spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=10485760"
Then, you can try the following:
scala> val df = spark.read.parquet("yellow_tripdata_2023-01.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]
scala> import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.functions.spark_partition_id
scala> df.groupBy(spark_partition_id).count
res1: org.apache.spark.sql.DataFrame = [SPARK_PARTITION_ID(): int, count: bigint]
scala> df.groupBy(spark_partition_id).count.show
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 1|3066766|
+--------------------+-------+
So it seems like there is only 1 partition!
Now, if you want to have this dataset partitioned on disk, you can .repartition
it.
Doing the following command in a spark-shell:
scala> df.repartition(4).write.parquet("test.parquet")
Will write away a partitioned file. You can verify this in the command line:
$ tree yellow_tripdata_2023-01.parquet
yellow_tripdata_2023-01.parquet
$ tree test.parquet/
test.parquet/
├── part-00000-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00001-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00002-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
├── part-00003-e8c43d21-24c9-4795-821d-2a8945c10e1b-c000.snappy.parquet
└── _SUCCESS
You see that test.parquet
has been partitioned on disk as well. Finally, you'll see that this .write.parquet
action will have properly balanced tasks too now:

What does spark.sql.files.maxPartitionBytes do then?
The spark.sql.files.maxPartitionBytes
configuration exists to prevent processing too many partitions in case there are more partitions than cores in your cluster.
You can imagine that with each partition, there comes a bit of overhead. By specifying it to be equal to a certain value (128MB by default) you're telling Spark: "You can group my input data if the partitions are too small, but not larger than this limit". Again, this is only relevant if there are more partitions than cores in your cluster.
We can test this out with the code that we wrote above. We know we wrote our test.parquet
file with 4 partitions of around 20MB each. Let's read this file in with a spark.files.maxPartitionBytes=52428800
(50MB). This should at least group 2 input partitions into a single one.
We're going to do this test with 2 cluster sizes. Once with 4 cores:
spark-shell --master "local[4]" --conf "spark.files.maxPartitionBytes=52428800"
scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]
scala> df.groupBy(spark_partition_id).count.show
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|766691|
| 3|766692|
| 2|766691|
| 0|766692|
+--------------------+------+
Here you see that the input partitions were not grouped together. This is because our cluster has 4 cores, so it's still more efficient to fully parallelize the partitions over the workers, even though their size is smaller than maxPartitionBytes
.
And once with 2 cores:
spark-shell --master "local[2]" --conf "spark.files.maxPartitionBytes=52428800"
scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp ... 17 more fields]
scala> df.groupBy(spark_partition_id).count.show
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 1|1533383|
| 0|1533383|
+--------------------+-------+
Here the input partitions were grouped together indeed! This is because the total number of CPU in our cluster is smaller than the amount of partitions in the input data, so it's worth it to group them together in order to diminish overhead.
I've explained how this input splitting is handled in a bit more detail in this SO answer if you're interested in more info on this.