4

How can I replace null values with median in the columns Age and Height below data set df.

df = spark.createDataFrame([(1, 'John', 1.79, 28,'M', 'Doctor'),
                        (2, 'Steve', 1.78, 45,'M', None),
                        (3, 'Emma', 1.75, None, None, None),
                        (4, 'Ashley',1.6, 33,'F', 'Analyst'),
                        (5, 'Olivia', 1.8, 54,'F', 'Teacher'),
                        (6, 'Hannah', 1.82, None, 'F', None),
                        (7, 'William',None, 42,'M', 'Engineer'),
                        (None,None,None,None,None,None),
                        (8,'Ethan',1.55,38,'M','Doctor'),
                        (9,'Hannah',1.65,None,'F','Doctor'),
                       (10,'Xavier',1.64,43,None,'Doctor')]
                       , ['Id', 'Name', 'Height', 'Age', 'Gender', 'Profession'])

In the post Replace missing values with mean - Spark Dataframe I used the function given from pyspark.ml.feature import Imputer

imputer = Imputer(
inputCols=df.columns, 
outputCols=["{}_imputed".format(c) for c in df.columns])

imputer.fit(df).transform(df)

It throws me an error.

IllegalArgumentException: 'requirement failed: Column Id must be of type equal to one of the following types: [DoubleType, FloatType] but was actually of type LongType.'

So please help. Thank you

melik
  • 1,268
  • 3
  • 21
  • 42
  • 1
    Possible duplicate of [Replace missing values with mean - Spark Dataframe](https://stackoverflow.com/questions/40057563/replace-missing-values-with-mean-spark-dataframe) – Alper t. Turker Aug 03 '18 at 22:02

2 Answers2

3

It's likely an initial casting error (I had some strings I needed to be floats). To convert all cols to floats do:

from pyspark.sql.functions import col
df = df.select(*(col(c).cast("float").alias(c) for c in df.columns))

Then you should be fine to impute. Note: I set my strategy to median rather than mean.

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
    ).setStrategy("median")

# Add imputation cols to df
df = imputer.fit(df).transform(df)
kevin_theinfinityfund
  • 1,631
  • 17
  • 18
  • You can also overwrite the current column names with the imputed values by `outputCols=[c for c in df.columns]`. Also note that sometimes `median` is hard for PySpark to calculate due to the distribution across worker nodes. It seems like `mean` is generally more stable. [This question](https://stackoverflow.com/questions/31432843/how-to-find-median-and-quantiles-using-spark) gives a potential solution. – kevin_theinfinityfund Mar 09 '21 at 20:08
0

I'd be interested in a more elegant solution but I separately imputed the categoricals from the numerics. To impute the categoricals I got the most common value and filled the blanks with it using the when and otherwise functions:

import pyspark.sql.functions as F
for col_name in ['Name', 'Gender', 'Profession']:
    common = df.dropna().groupBy(col_name).agg(F.count("*")).orderBy('count(1)', ascending=False).first()[col_name]
    df = df.withColumn(col_name, F.when(F.isnull(col_name), common).otherwise(df[col_name]))

To impute the numerics before running the imputer lines I simply casting the Age and Id columns as doubles circumvents the issue for the numeric fields and restrict the imputer to the numerical columns.

from pyspark.ml.feature import Imputer
df = df.withColumn("Age", df['Age'].cast('double')).withColumn('Id', df['Id'].cast('double'))
imputer = Imputer(
inputCols=['Id', 'Height', 'Age'],
outputCols=['Id', 'Height', 'Age'])
imputer.fit(df).transform(df)
kevins_1
  • 1,268
  • 2
  • 9
  • 27