2

Say I have a pyspark dataframe 'data' as follows. I want to partition the data by "Period". Rather I want each period of data to be stored on it's own partition (see the example below the 'data' dataframe below).

data = sc.parallelize([[1,1,0,14277.4,0], \
[1,2,0,14277.4,0], \
[2,1,0,4741.91,0], \
[2,2,0,4693.03,0], \
[3,1,2,9565.93,0], \
[3,2,2,9566.05,0], \
[4,2,0,462.68,0], \
[5,1,1,3549.66,0], \
[5,2,5,3549.66,1], \
[6,1,1,401.52,0], \
[6,2,0,401.52,0], \
[7,1,0,1886.24,0], \
[7,2,0,1886.24,0]]) \
.toDF(("Acct","Period","Status","Bal","CloseFlag"))

data.show(100)

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     1|     0|14277.4|        0|
|   1|     2|     0|14277.4|        0|
|   2|     1|     0|4741.91|        0|
|   2|     2|     0|4693.03|        0|
|   3|     1|     2|9565.93|        0|
|   3|     2|     2|9566.05|        0|
|   4|     2|     0| 462.68|        0|
|   5|     1|     1|3549.66|        0|
|   5|     2|     5|3549.66|        1|
|   6|     1|     1| 401.52|        0|
|   6|     2|     0| 401.52|        0|
+----+------+------+-------+---------+

For Example

Partition 1:

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     1|     0|14277.4|        0|
|   2|     1|     0|4741.91|        0|
|   3|     1|     2|9565.93|        0|
|   5|     1|     1|3549.66|        0|
|   6|     1|     1| 401.52|        0|
+----+------+------+-------+---------+

Partition 2:

+----+------+------+-------+---------+
|Acct|Period|Status|    Bal|CloseFlag|
+----+------+------+-------+---------+
|   1|     2|     0|14277.4|        0|
|   2|     2|     0|4693.03|        0|
|   3|     2|     2|9566.05|        0|
|   4|     2|     0| 462.68|        0|
|   5|     2|     5|3549.66|        1|
|   6|     2|     0| 401.52|        0|
+----+------+------+-------+---------+
murtihash
  • 8,030
  • 1
  • 14
  • 26
DeeeeRoy
  • 467
  • 2
  • 5
  • 13

1 Answers1

4

The approach should be to repartition first, to have the right number of partitions(number of unique periods), and then partition by the Period column before saving it.

from pyspark.sql import functions as F
n = data.select(F.col('Period')).distinct().count()

data.repartition(n)\
     .write \
     .partitionBy("Period")\
     .mode("overwrite")\
     .format("parquet")\
     .saveAsTable("testing")
B25Dec
  • 2,301
  • 5
  • 31
  • 54
murtihash
  • 8,030
  • 1
  • 14
  • 26
  • 1
    My data has 1.1 billion+ rows and roughly 160 columns. This method is taking a long time. Are there any more efficient ways to do this? – DeeeeRoy Feb 04 '20 at 18:34
  • The repartition is a shuffle operation which means it will redistribute all the data so yes it will take time.. you could try to df.persist(StorageLevel.MEMORY_AND_DISK) after reparitition, but before writing it. It might work faster, depending on your config – murtihash Feb 04 '20 at 18:42
  • What's the difference between your answer and df.repartition(n, col('colname'))? – DeeeeRoy Feb 04 '20 at 21:45
  • In df.repartition(n, col('colname')) it will mean that limited number of values(n) from 'colname' make their way into each partition. My reasoning was why add the extra step when we are already using partitionby. You could try df.repartition(numPartitions, cols).write.partitionBy(cols) and compare performance.. this link will have a more in-depth answer to your question : https://stackoverflow.com/questions/50775870/pyspark-efficiently-have-partitionby-write-to-same-number-of-total-partitions-a – murtihash Feb 04 '20 at 22:01