I have a pyspark data frame as shown below.
+---+-------+--------+
|age|balance|duration|
+---+-------+--------+
| 2| 2143| 261|
| 44| 29| 151|
| 33| 2| 76|
| 50| 1506| 92|
| 33| 1| 198|
| 35| 231| 139|
| 28| 447| 217|
| 2| 2| 380|
| 58| 121| 50|
| 43| 693| 55|
| 41| 270| 222|
| 50| 390| 137|
| 53| 6| 517|
| 58| 71| 71|
| 57| 162| 174|
| 40| 229| 353|
| 45| 13| 98|
| 57| 52| 38|
| 3| 0| 219|
| 4| 0| 54|
+---+-------+--------+
and my expected output should be look like,
+---+-------+--------+-------+-----------+------------+
|age|balance|duration|age_out|balance_out|duration_out|
+---+-------+--------+-------+-----------+------------+
| 2| 2143| 261| 1| 1| 0|
| 44| 29| 151| 0| 0| 0|
| 33| 2| 76| 0| 0| 0|
| 50| 1506| 92| 0| 1| 0|
| 33| 1| 198| 0| 0| 0|
| 35| 231| 139| 0| 0| 0|
| 28| 447| 217| 0| 0| 0|
| 2| 2| 380| 1| 0| 0|
| 58| 121| 50| 0| 0| 0|
| 43| 693| 55| 0| 0| 0|
| 41| 270| 222| 0| 0| 0|
| 50| 390| 137| 0| 0| 0|
| 53| 6| 517| 0| 0| 1|
| 58| 71| 71| 0| 0| 0|
| 57| 162| 174| 0| 0| 0|
| 40| 229| 353| 0| 0| 0|
| 45| 13| 98| 0| 0| 0|
| 57| 52| 38| 0| 0| 0|
| 3| 0| 219| 1| 0| 0|
| 4| 0| 54| 0| 0| 0|
+---+-------+--------+-------+-----------+------------+
Here my objective is to identify the outlier records in the data set by using inter quartile method as I described in the below python code. If we find any outlier records, then we need to flag them as 1 otherwise 0.
I can do the same thing using python by using below code.
import numpy as np
def outliers_iqr(ys):
quartile_1, quartile_3 = np.percentile(ys, [25, 75])
iqr = quartile_3 - quartile_1
lower_bound = quartile_1 - (iqr * 1.5)
upper_bound = quartile_3 + (iqr * 1.5)
ser = np.zeros(len(ys))
pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
ser[pos]=1
return(ser)
But I wanted to do the same thing in pyspark. Can someone help me on the same?
my pyspark code:
def outliers_iqr(ys):
quartile_1, quartile_3 = np.percentile(ys, [25, 75])
iqr = quartile_3 - quartile_1
lower_bound = quartile_1 - (iqr * 1.5)
upper_bound = quartile_3 + (iqr * 1.5)
ser = np.zeros(len(ys))
pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
ser[pos]=1
return(float(ser))
outliers_iqr_udf = udf(outliers_iqr, FloatType())
DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()