You can use Bucketizer for binning the value
according the split you wish to determine , once the buckets flagged against each row you can further categorize them using a udf respective to the bin the correspond to
Data Preparation
input_str = """
2| 50.34|
4| 34.4|
6| 48.7|
10| 72.4
""".split("|")
input_values = list(map(lambda x: x.strip() if x.strip() != '|' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "id,value".split(',')))
n = len(input_values)
n_cols = 2
input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]
sparkDF = sql.createDataFrame(input_list, cols)
sparkDF = sparkDF.withColumn('value',F.col('value').cast('float'))\
.withColumn('id',F.col('id').cast('int'))
sparkDF.show()
+---+-----+
| id|value|
+---+-----+
| 2|50.34|
| 4| 34.4|
| 6| 48.7|
| 10| 72.4|
+---+-----+
Bucketizer
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(
splits=[0,20,40,60,80],
inputCol='value',
outputCol='value_bin'
)
sparkDF = bucketizer.transform(sparkDF)
sparkDF.show()
+---+-----+---------+
| id|value|value_bin|
+---+-----+---------+
| 2|50.34| 2.0|
| 4| 34.4| 1.0|
| 6| 48.7| 2.0|
| 10| 72.4| 3.0|
+---+-----+---------+
Categorize Value Bin
split_arr = bucketizer.getSplits()
### O/P --> [0.0, 20.0, 40.0, 60.0, 80.0]
format_udf = F.udf(lambda x:f'{int(split_arr[int(x)])}-{int(split_arr[int(x)+1])}',StringType())
sparkDF = sparkDF.withColumn(
'numbers_bucket',format_udf('value_bin')
)
sparkDF.show()
+---+-----+---------+--------------+
| id|value|value_bin|numbers_bucket|
+---+-----+---------+--------------+
| 2|50.34| 2.0| 40-60|
| 4| 34.4| 1.0| 20-40|
| 6| 48.7| 2.0| 40-60|
| 10| 72.4| 3.0| 60-80|
+---+-----+---------+--------------+
Final Output - GroupBy
sparkDF = sparkDF.groupby(['value_bin','numbers_bucket']).count()
sparkDF.show()
+---------+--------------+-----+
|value_bin|numbers_bucket|count|
+---------+--------------+-----+
| 2.0| 40-60| 2|
| 3.0| 60-80| 1|
| 1.0| 20-40| 1|
+---------+--------------+-----+