0

I have a large set of end-result data and this data is non-uniformly distributed per my interested column. When I write it directly by partitioning, each partition has equal number of files as the spark.sql.shuffle.partitions. This causes each file in a crowded partition to be very large (in GBs), but in some other partitions, file size is really small (even in KBs). Is there a way to change the number of files per partition?

Example:

+----------------------------+----------+
| number of rows in category | category |
+----------------------------+----------+
| 50000000000                |    A     |
| 200000                     |    B     |
| 30000                      |    C     |
+----------------------------+----------+

If I do:

df.write.partitionBy("category").parquet(output_dir)

Sizes of files in folder "A" is large, whereas the ones in "B" and "C" is small.

Sinan Erdem
  • 1,014
  • 1
  • 13
  • 22

2 Answers2

1

Try re-partitioning the dataframe using multiple columns (if possible and logical to your data).

Example:

df.repartition("category", "<some_other_column_name>").write.partitionBy("category").parquet(output_dir)
joshi.n
  • 489
  • 3
  • 7
1

I would suggest calling df.repartition(NUM_PARTITIONS) on the dataframe to evenly distribute rows over the partitions. In your case, for category=A, the rows would distributed over more number of partitions, say, compared to category C. After repartition, when you call write.partitionBy("category"), for category A since it was distributed over more partitions, more number of files would be written (one file per partition of category A).

NUM_PARTITIONS can be dynamic like NUM_PARTITIONS = df.count()%ROWS_PER_PARTITION. You can decide how many ROWS_PER_PARTITION depending on the byte size per row.

NUM_PARTITIONS = 100 
df.repartition(NUM_PARTITIONS).write.partitionBy("category").parquet(output_dir)

If you want to check how the partitions are distributed, you can use this

import pyspark.sql.functions as f
df.withColumn("partition_id",f.spark_partition_id()).groupBy("partition_id").count().show()

For more detailed discussions, you see this Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?

Manoj Singh
  • 1,627
  • 12
  • 21
  • Correct me if I am wrong, but I believe your answer suggest I can change the number of partitions by number of rows in a dataframe. So if I have large dataframe, it can dynamically be increased. But how is it possible to have different number of partitions per category? I want 10 partitions for category A and 2 partitions for category B for example. – Sinan Erdem Dec 05 '18 at 11:43
  • 1
    Its not possible to specify the number of partitions by column value. But its side-effect of the repartition. If rows with Category=A are too large to fit into a single partition, it would spill over to another partition. whereas the smaller counts would fit into a single partition. – Manoj Singh Dec 05 '18 at 12:21
  • I tried this and it didn't help. I believe it is because there are sufficient number of rows in reach group, so that each partition has at least a row from each Category. For the smallest group C, 30000 vs 100 partitions... – Sinan Erdem Dec 05 '18 at 15:11
  • Try with specifying both the number of partitions and category column. df.repartition(NUM_PARTITIONS, "category") – Manoj Singh Dec 05 '18 at 15:14