0

I have a column Foo which contains double value such as:

[ 100.4, 39.6, 98.2, 10.8, 62.1, 69.6 … ]

I would like to repartition using a window of 10 which would generate a dataset something like:

Foo=10
Foo=20
Foo=30
Foo=40
Foo=50
Foo=60
Foo=70
Foo=80
Foo=90
Foo=100
Foo=110

The use of repartiton(number: int, colname: str) split the dataframe to the given number of files. But I can not choose the window.

So how to do it in pyspark ?

thanks

bioinfornatics
  • 1,749
  • 3
  • 17
  • 36

2 Answers2

1

I'm not sure what you do mean by repartitioning, but in any case, assuming you have a df of:

+-----+
|  Foo|
+-----+
|100.4|
| 39.6|
| 98.2|
| 10.8|
| 62.1|
| 69.6|
+-----+

You can easily round your values:

from pyspark.sql.functions import col, floor
df2 = df.withColumn('Foo_binned', floor(col('Foo') / 10) * 10)
+-----+----------+
|  Foo|Foo_binned|
+-----+----------+
|100.4|       100|
| 39.6|        30|
| 98.2|        90|
| 10.8|        10|
| 62.1|        60|
| 69.6|        60|
+-----+----------+

If this is a result you are looking for, you can select / rename just the new column. You can also just change the method for rounding dependently on your requirements (floor, round, ceil).

If by repartitioning you actually want to physically save the values in different folders based on the bucketing into 10's, you can run:

df2.write.partitionBy('Foo_binned').csv('./foos.csv')

Which will partition the data while saving:

30.03.2020  23:05                 8 ._SUCCESS.crc
30.03.2020  23:05    <DIR>          Foo_binned=10
30.03.2020  23:05    <DIR>          Foo_binned=100
30.03.2020  23:05    <DIR>          Foo_binned=30
30.03.2020  23:05    <DIR>          Foo_binned=60
30.03.2020  23:05    <DIR>          Foo_binned=90
30.03.2020  23:05                 0 _SUCCESS

Last but not least, if you just want your in-memory data partitioned by those buckets, it's pretty hard to achieve, because, well, you shouldn't be doing that. Spark includes an optimization engine that will do it's best when you just let it

df = spark.createDataFrame([ (100.2,), (100.1,), (100.7,), (100.4,), (39.6, ), (39.6, ), (39.6, ), (39.6, ), (98.2, ), (10.8, ), (10.2, ), (10.8, ), (10.8, ), (62.1, ), (69.6, )], ['Foo'])
df2 = df.repartitionByRange('Foo')
print('No of partitions', df2.rdd.getNumPartitions())

No of partitions 8

Daniel
  • 1,132
  • 8
  • 12
  • thanks daniel, I used the word `repartion` as to me it was used to achieve the goal: https://stackoverflow.com/questions/44808415/spark-parquet-partitioning-large-number-of-files – bioinfornatics Mar 30 '20 at 22:32
1

Adding to Daniel's answer.

+-----+----------+
|  Foo|Foo_binned|
+-----+----------+
|100.4|       100|
| 39.6|        30|
| 98.2|        90|
| 10.8|        10|
| 62.1|        60|
| 69.6|        60|
+-----+----------+

This will ensure that for every foo range, you get 1 file.

from pyspark.sql import functions as F
n = df.select(F.col('Foo_binned')).distinct().count()

data.repartition(n)\
     .write \
     .partitionBy("Foo_binned")\
     .csv(path)
murtihash
  • 8,030
  • 1
  • 14
  • 26