0

I have a PySpark dataframe with single column.

| Rank  
|----------
| 10
| 10
| null   
| null     
| 15
| null
| 20
| null     
| null     
| 15
| null   
| 10

I want to impute the missing values using forward fill like pandas ffill() function.

Desired Output

| Rank    
|----------
| 10
| 10
| 10   
| 10     
| 15
| 15
| 20
| 20     
| 20     
| 15
| 15   
| 10

Disclaimer: I have some some solutions in stackoverflow but they wont work when you only have a single column as input.

zero323
  • 322,348
  • 103
  • 959
  • 935
GeorgeOfTheRF
  • 8,244
  • 23
  • 57
  • 80

1 Answers1

1

Please check out the lag and lead functions in spark.

lag and lead are used to only for getting one offset. Simple creating udf with a global variable should do a trick Below is simple example

var PRV_RANK = 0f

import spark.implicits._
val data = spark.sparkContext.parallelize(Seq(10f, 10f, Float.NaN, Float.NaN, 15f, Float.NaN, 20f, Float.NaN, Float.NaN, 15f, Float.NaN, 10f))
  .toDF("rank")

val forwardFill = udf((rank: Float) =>
{
  if (rank == null || rank.equals(Float.NaN)){
    PRV_RANK
  }
  else {
    PRV_RANK = rank
    rank
  }
})

data.withColumn("rankNew", forwardFill($"rank")).show()

Hope this helps!

koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Can you pls share the scala version of the code? I know you are good at Spark with scala :D – GeorgeOfTheRF Jun 07 '17 at 10:58
  • I have updated the answer, we could not do with the lag and lead for multiple null values. – koiralo Jun 07 '17 at 12:42
  • No. I dont know how to convert this line "(rank == null || rank.equals(Float.NaN)" to python – GeorgeOfTheRF Jun 12 '17 at 07:47
  • you can do if rank is None or math.isnan(rank): – koiralo Jun 12 '17 at 07:58
  • can you please accept as an answer if this worked for you ? – koiralo Jun 13 '17 at 03:11
  • As recommended by you I used the math.isnan() function in the code given in this question. https://stackoverflow.com/questions/44494002/how-to-check-in-python-if-cell-value-of-pyspark-dataframe-column-in-udf-function?noredirect=1#comment75989638_44494002 – GeorgeOfTheRF Jun 13 '17 at 05:39
  • But I am still getting the same large error log I have listed in the question – GeorgeOfTheRF Jun 13 '17 at 05:40
  • This works perfectly in scala spark you should find a way to do in python. – koiralo Jun 13 '17 at 05:41
  • import math df = spark.createDataFrame( [(1, 1, None), (1, 2, float(5)), (1, 3, float('nan')), (1, 4, None), (1, 5, float(10)), (1, 6, None)], ('session', "timestamp", "id")) PRV_RANK = 0.0 def forwardfil(rank): if rank is None | math.isnan(rank): return PRV_RANK else: PRV_RANK = rank return rank forwardFill = F.udf(forwardfil, FloatType()) df.withColumn("ffill_new", forwardFill(df["id"])).show() – GeorgeOfTheRF Jun 13 '17 at 05:42
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/146480/discussion-between-shankar-koirala-and-ml-pro). – koiralo Jun 13 '17 at 05:43