1

I am trying to do binning on a particular column in dataframe based on the data given in the dictionary.

Below is the dataframe I used:

df

SNo,Name,CScore
1,x,700
2,y,850
3,z,560
4,a,578
5,b,456
6,c,678

I have created the below function,it is working fine if I use it seperately.


def binning(column,dict):
    finalColumn=[]
    lent = len(column)
    for i in range(lent):
        for j in range(len(list(dict))):
            if( int(column[i]) in range(list(dict)[j][0],list(dict)[j][1])):
                finalColumn.append(dict[list(dict)[j]])
    return finalColumn

I have used the above function in the below statement.

newDf = df.withColumn("binnedColumn",binning(df.select("CScore").rdd.flatMap(lambda x: x).collect(),{(1,400):'Low',(401,900):'High'}))

I am getting the below error:

Traceback (most recent call last): File "", line 1, in File "C:\spark_2.4\python\pyspark\sql\dataframe.py", line 1988, in withColumn assert isinstance(col, Column), "col should be Column" AssertionError: col should be Column

Any help to solve this issue will be of great help.Thanks.

Vineel
  • 35
  • 6
  • You need to make `binning` into a user defined function (`udf`). Also don't name your variables `dict`. Also calling `collect()` *inside* `withColumn` is going to give you *terrible* performance.You can probably achieve the same result using `when()` and `between()`. – pault Apr 22 '19 at 16:00
  • I have used the udf and renamed the dict variable ,even though it is not working.Any other suggestion ? Thanks.. – Vineel Apr 22 '19 at 16:07
  • 2
    Please [edit] your question to include a [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) which includes the exact code you've tried and the full error messages or an explanation of why it's not working. It's also recommended that you also take the [tour] and read [ask]. – pault Apr 22 '19 at 16:09

1 Answers1

1

Lets start with creating the data:

rdd = sc.parallelize([[1,"x",700],[2,"y",850],[3,"z",560],[4,"a",578],[5,"b",456],[6,"c",678]])
df = rdd.toDF(["SNo","Name","CScore"])
>>> df.show()
+---+----+------+
|SNo|Name|CScore|
+---+----+------+
|  1|   x|   700|
|  2|   y|   850|
|  3|   z|   560|
|  4|   a|   578|
|  5|   b|   456|
|  6|   c|   678|
+---+----+------+

if your final goal is to provide a binning_dictionary like your example do, to find the corresponding categorical value. udf is the solution.

the following is your normal function.

bin_lookup = {(1,400):'Low',(401,900):'High'}
def binning(value, lookup_dict=bin_lookup):
    for key in lookup_dict.keys():
        if key[0] <= value <= key[1]:
             return lookup_dict[key]

to register as a udf and run it via dataframe:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

binning_udf = udf(binning, StringType())
>>> df.withColumn("binnedColumn", binning_udf("CScore")).show()
+---+----+------+------------+
|SNo|Name|CScore|binnedColumn|
+---+----+------+------------+
|  1|   x|   700|        High|
|  2|   y|   850|        High|
|  3|   z|   560|        High|
|  4|   a|   578|        High|
|  5|   b|   456|        High|
|  6|   c|   678|        High|
+---+----+------+------------+

directly apply to rdd:

>>> rdd.map(lambda row: binning(row[-1], bin_lookup)).collect()
['High', 'High', 'High', 'High', 'High', 'High']

E.ZY.
  • 675
  • 5
  • 12