2

Description:" How can I fill the missing value in price column with mean, grouping data by condition and model columns in Pyspark? My python code would be like this :cars['price'] = np.ceil(cars['price'].fillna(cars.groupby(['condition', 'model' ])['price'].transform('mean')))

Error: I try different codes in Pyspark but each time I get different errors. Like this, code:cars_new=cars.fillna((cars.groupBy("condition", "model").agg(mean("price"))['avg(price)'])) Error :

ValueError: value should be a float, int, long, string, bool or dict

DataFrame

enter image description here

Foxbat
  • 183
  • 2
  • 11
  • Please share more information about your dataframe, schema, input dataframe, expected output etc. – vladsiv Dec 01 '21 at 15:04
  • Does this answer your question? [Fill Pyspark dataframe column null values with average value from same column](https://stackoverflow.com/questions/37749759/fill-pyspark-dataframe-column-null-values-with-average-value-from-same-column) – vladsiv Dec 01 '21 at 15:09
  • @ Vlad Siv, I do not think that these questions are the same, I have already looked at this question. – Foxbat Dec 01 '21 at 15:13

2 Answers2

3

Not sure how your input data looks like but let's say we have a dataframe that looks like this:

+---------+-----+-----+                                                         
|condition|model|price|
+---------+-----+-----+
|A        |A    |1    |
|A        |B    |2    |
|A        |B    |2    |
|A        |A    |1    |
|A        |A    |null |
|B        |A    |3    |
|B        |A    |null |
|B        |B    |4    |
+---------+-----+-----+

We want to fill null with average but over condition and model.

For this we can define a Window, calculate avg and then replace null.

Example:

from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("test").getOrCreate()
data = [
    {"condition": "A", "model": "A", "price": 1},
    {"condition": "A", "model": "B", "price": 2},
    {"condition": "A", "model": "B", "price": 2},
    {"condition": "A", "model": "A", "price": 1},
    {"condition": "A", "model": "A", "price": None},
    {"condition": "B", "model": "A", "price": 3},
    {"condition": "B", "model": "A", "price": None},
    {"condition": "B", "model": "B", "price": 4},
]

window = Window.partitionBy(["condition", "model"]).orderBy("condition")
df = spark.createDataFrame(data=data)
df = (
    df.withColumn("avg", F.avg("price").over(window))
    .withColumn(
        "price", F.when(F.col("price").isNull(), F.col("avg")).otherwise(F.col("price"))
    )
    .drop("avg")
)

Which gives us:

+---------+-----+-----+
|condition|model|price|
+---------+-----+-----+
|A        |A    |1.0  |
|A        |A    |1.0  |
|A        |A    |1.0  |
|B        |B    |4.0  |
|B        |A    |3.0  |
|B        |A    |3.0  |
|A        |B    |2.0  |
|A        |B    |2.0  |
+---------+-----+-----+
vladsiv
  • 2,718
  • 1
  • 11
  • 21
1

It could be done using window functions like this:

cars_new = cars.fillna(0, subset=['price'])
w = Window().partitionBy('condition', 'model')
cars = cars.withColumn('price',when(col('price').isNull(), avg(col('price')).over(w)).otherwise(col('price')))
alexopoulos7
  • 794
  • 7
  • 27
Foxbat
  • 183
  • 2
  • 11