0

I need to bucket the values in a DF into various categories. Below is the input and expected output. I saw this answer here but I would like to do it without converting the DF into an RDD if possible.

Input
+---------+-------------------+
|       ID|              value|
+---------+-------------------+
|        2|              50.34|
|        4|               34.4|
|        6|               48.7|
|       10|               72.4|
+---------+-------------------+

OutputDF
+---------+-------------------+----------+
|   bucket|               size|    count |
+---------+-------------------+--------- +
|        0|               0-20|         0|
|        1|              20-40|         1|
|        2|              40-60|         2|
|        3|              60-80|         1|  
+---------+-------------------+----------+

1 Answers1

0

You can use Bucketizer for binning the value according the split you wish to determine , once the buckets flagged against each row you can further categorize them using a udf respective to the bin the correspond to

Data Preparation

input_str = """
2|              50.34|
4|               34.4|
6|               48.7|
10|               72.4
""".split("|")

input_values = list(map(lambda x: x.strip() if x.strip() != '|' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "id,value".split(',')))
        
n = len(input_values)
n_cols = 2

input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list, cols)

sparkDF = sparkDF.withColumn('value',F.col('value').cast('float'))\
                .withColumn('id',F.col('id').cast('int'))

sparkDF.show()

+---+-----+
| id|value|
+---+-----+
|  2|50.34|
|  4| 34.4|
|  6| 48.7|
| 10| 72.4|
+---+-----+

Bucketizer

from pyspark.ml.feature import Bucketizer

bucketizer = Bucketizer(
    splits=[0,20,40,60,80],
    inputCol='value', 
    outputCol='value_bin'
)

sparkDF = bucketizer.transform(sparkDF)

sparkDF.show()

+---+-----+---------+
| id|value|value_bin|
+---+-----+---------+
|  2|50.34|      2.0|
|  4| 34.4|      1.0|
|  6| 48.7|      2.0|
| 10| 72.4|      3.0|
+---+-----+---------+

Categorize Value Bin

split_arr = bucketizer.getSplits()
### O/P --> [0.0, 20.0, 40.0, 60.0, 80.0]

format_udf = F.udf(lambda x:f'{int(split_arr[int(x)])}-{int(split_arr[int(x)+1])}',StringType())

sparkDF = sparkDF.withColumn(
    'numbers_bucket',format_udf('value_bin')
)

sparkDF.show()

+---+-----+---------+--------------+
| id|value|value_bin|numbers_bucket|
+---+-----+---------+--------------+
|  2|50.34|      2.0|         40-60|
|  4| 34.4|      1.0|         20-40|
|  6| 48.7|      2.0|         40-60|
| 10| 72.4|      3.0|         60-80|
+---+-----+---------+--------------+

Final Output - GroupBy

sparkDF = sparkDF.groupby(['value_bin','numbers_bucket']).count()

sparkDF.show()

+---------+--------------+-----+
|value_bin|numbers_bucket|count|
+---------+--------------+-----+
|      2.0|         40-60|    2|
|      3.0|         60-80|    1|
|      1.0|         20-40|    1|
+---------+--------------+-----+
Vaebhav
  • 4,672
  • 1
  • 13
  • 33
  • What about the bucket that counts to 0? – fskj Oct 05 '21 at 08:30
  • In the example output provided in the question, bucket 0 is reported with a count of 0. – fskj Oct 05 '21 at 09:25
  • This sems a little off. In the inital DF there was no value between 0-20, but there is a count of 1 in th result. Also, originally there was 1 value between 60-80, but that category is missing in the result. It looks like everything is 1 bucket lower than it should be – Tony LaRussa Oct 05 '21 at 09:27
  • Updated the code , to fetch the correct bucket bin the value corresponds to – Vaebhav Oct 05 '21 at 09:32