6

I have a pyspark data frame as shown below.

+---+-------+--------+
|age|balance|duration|
+---+-------+--------+
|  2|   2143|     261|
| 44|     29|     151|
| 33|      2|      76|
| 50|   1506|      92|
| 33|      1|     198|
| 35|    231|     139|
| 28|    447|     217|
|  2|      2|     380|
| 58|    121|      50|
| 43|    693|      55|
| 41|    270|     222|
| 50|    390|     137|
| 53|      6|     517|
| 58|     71|      71|
| 57|    162|     174|
| 40|    229|     353|
| 45|     13|      98|
| 57|     52|      38|
|  3|      0|     219|
|  4|      0|      54|
+---+-------+--------+

and my expected output should be look like,

+---+-------+--------+-------+-----------+------------+
|age|balance|duration|age_out|balance_out|duration_out|
+---+-------+--------+-------+-----------+------------+
|  2|   2143|     261|      1|          1|           0|
| 44|     29|     151|      0|          0|           0|
| 33|      2|      76|      0|          0|           0|
| 50|   1506|      92|      0|          1|           0|
| 33|      1|     198|      0|          0|           0|
| 35|    231|     139|      0|          0|           0|
| 28|    447|     217|      0|          0|           0|
|  2|      2|     380|      1|          0|           0|
| 58|    121|      50|      0|          0|           0|
| 43|    693|      55|      0|          0|           0|
| 41|    270|     222|      0|          0|           0|
| 50|    390|     137|      0|          0|           0|
| 53|      6|     517|      0|          0|           1|
| 58|     71|      71|      0|          0|           0|
| 57|    162|     174|      0|          0|           0|
| 40|    229|     353|      0|          0|           0|
| 45|     13|      98|      0|          0|           0|
| 57|     52|      38|      0|          0|           0|
|  3|      0|     219|      1|          0|           0|
|  4|      0|      54|      0|          0|           0|
+---+-------+--------+-------+-----------+------------+

Here my objective is to identify the outlier records in the data set by using inter quartile method as I described in the below python code. If we find any outlier records, then we need to flag them as 1 otherwise 0.

I can do the same thing using python by using below code.

import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)

But I wanted to do the same thing in pyspark. Can someone help me on the same?

my pyspark code:

def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(float(ser))

outliers_iqr_udf = udf(outliers_iqr, FloatType())
DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()
RSK
  • 751
  • 2
  • 7
  • 18

2 Answers2

12

You can use pyspark.sql.DataFrame.approxQuantile inside of a loop to get the desired 25th and 75th percentile values for each of your columns.

bounds = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df.columns
}

The last argument passed is the relative error, which you can read about on the linked post as well as on the docs. The short version is that the lower the number, the more accurate your result will be but there is a trade-off between accuracy and computational expense. (Here I used 0 to get the exact value, but you may want to choose a different value based on the size of your data.)

Once you have the first and third quartile values, you can compute the iqr and upper/lower bounds quite easily:

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)
#{'age': {'lower': 3.0, 'q1': 33.0, 'q3': 53.0, 'upper': 83.0},
# 'balance': {'lower': -570.0, 'q1': 6.0, 'q3': 390.0, 'upper': 966.0},
# 'duration': {'lower': -143.0, 'q1': 76.0, 'q3': 222.0, 'upper': 441.0}}

Now use pyspark.sql.functions.when in a list comprehension to build the outlier columns based on bounds:

import pyspark.sql.functions as f
df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df.columns
    ]
).show()
#+---+-------+--------+-------+-----------+------------+
#|age|balance|duration|age_out|balance_out|duration_out|
#+---+-------+--------+-------+-----------+------------+
#|  2|   2143|     261|      1|          1|           0|
#| 44|     29|     151|      0|          0|           0|
#| 33|      2|      76|      0|          0|           0|
#| 50|   1506|      92|      0|          1|           0|
#| 33|      1|     198|      0|          0|           0|
#| 35|    231|     139|      0|          0|           0|
#| 28|    447|     217|      0|          0|           0|
#|  2|      2|     380|      1|          0|           0|
#| 58|    121|      50|      0|          0|           0|
#| 43|    693|      55|      0|          0|           0|
#| 41|    270|     222|      0|          0|           0|
#| 50|    390|     137|      0|          0|           0|
#| 53|      6|     517|      0|          0|           1|
#| 58|     71|      71|      0|          0|           0|
#| 57|    162|     174|      0|          0|           0|
#| 40|    229|     353|      0|          0|           0|
#| 45|     13|      98|      0|          0|           0|
#| 57|     52|      38|      0|          0|           0|
#|  3|      0|     219|      0|          0|           0|
#|  4|      0|      54|      0|          0|           0|
#+---+-------+--------+-------+-----------+------------+

Here I used between to check if a value is not an outlier, and this function is inclusive (ie x between a and b is logically equivalent to x >= a and x <= b).

pault
  • 41,343
  • 15
  • 107
  • 149
  • Is it possible to solve this problem with my function? If so, please guide me what changes are required to make it work. Thanks for your help. – RSK Oct 04 '18 at 07:25
  • @RSK I don't see any easy way to modify your original code to accomplish this in spark. – pault Oct 04 '18 at 19:27
  • What does this f stand for. In my case it says that there is no variable called f?? – Saradamani May 23 '19 at 14:23
  • 1
    @Saradamani edited answer to include `import pyspark.sql.functions as f` – pault May 23 '19 at 14:28
  • So these are the observations across all the columns that are within the iqr. Correct me if I am wrong..Please – Saradamani May 23 '19 at 14:32
  • 1
    @Saradamani each `_out` column is an indicator that specifies if the value in that row for the corresponding column is an outlier. – pault May 23 '19 at 14:47
  • I have faced a terrible problem. What I am trying to do is to remove all the outliers from the whole data set . Like say I have row numbers 1,2 4,7 as outliers but row numbers 3,5,6,8,9,10 as not outliers. But here what I get is whether the value of an observation for an attribute is an outlier or not. But I want to detect and remove or isolate all the outliers detected by something like Maholanobis distance from the dataset. But this code does not help me on it. Why I want to do this? My accuracy is low if I do not delete the outliers from the dataset. Can you do the same for me? – Saradamani May 24 '19 at 11:16
  • What I want to do is this: 1.calculating the column mean 2.calculating variance covariance matrix 3. adding the mean vector to the existing dataset 4.calculating the moholanobish distance 5.Find out the iqr 6.create a column called distance in the dataset 7. df_without_outlier = df(distance <3rd quartile +1.5*iqr)) – Saradamani May 24 '19 at 11:25
  • @pault How do I edit your code to just get the outliers for a specific column, let's say take it as an input and just perform this operation on that column only? – mlenthusiast Oct 15 '19 at 00:04
  • @codingenthusiast the dict and list comprehensions are over the columns you want to perform the operation on. In this example, we used `df.columns`, but you can also specify your own subset of columns. – pault Oct 15 '19 at 14:43
0

Please find below my solution:

from pyspark.sql import functions as f


class Outlier():

    def __init__(self, df):
        self.df = df


    def _calculate_bounds(self):
        bounds = {
            c: dict(
                zip(["q1", "q3"], self.df.approxQuantile(c, [0.25, 0.75], 0))
            )
            for c, d in zip(self.df.columns, self.df.dtypes) if d[1] in ["bigint", "double"]
        }

        for c in bounds:
            iqr = bounds[c]['q3'] - bounds[c]['q1']
            bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
            bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

        return bounds


    def _flag_outliers_df(self):
        bounds = self._calculate_bounds()

        outliers_col = [
            f.when(
                ~f.col(c).between(bounds[c]['min'], bounds[c]['max']),
                f.col(c)
            ).alias(c + '_outlier')
            for c in bounds]

        return self.df.select(*outliers_col)


    def show_outliers(self):

        outlier_df = self._flag_outliers_df()

        for outlier in outlier_df.columns:
            outlier_df.select(outlier).filter(f.col(outlier).isNotNull()).show()

And then pass your dataframe as below:

Outlier(df).show_outliers()
Florian
  • 194
  • 2
  • 17