8

I am attempting to fill in missing values in my Spark dataframe with the previous non-null value (if it exists). I've done this type of thing in Python/Pandas but my data is too big for Pandas (on a small cluster) and I'm Spark noob. Is this something Spark can do? Can it do it for multiple columns? If so, how? If not, any suggestions for alternative approaches within the who Hadoop suite of tools?

Thanks!

user1624577
  • 547
  • 2
  • 6
  • 15
  • Looks like this [has been asked before](http://stackoverflow.com/questions/36019847/pyspark-forward-fill-with-last-observation-for-a-dataframe), without much success. – chrisaycock Jun 30 '16 at 19:52
  • @chrisaycock - yeah, I've noticed :/ I would think that this would be possible though. – user1624577 Jun 30 '16 at 19:55
  • I believe it's possible using `Window`, but I'm actually working my way through that conceptually right now. Although if your data is large enough to need a cluster, why impute these instead of dropping the observations? Keep in mind when you impute that you're making up data that doesn't exist - it has its uses, but you should still avoid it if you can. – Jeff Jun 30 '16 at 19:59
  • 1
    It looks like you can do it if you convert to an RDD first, then back to a dataframe: http://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation?lq=1 – Jeff Jun 30 '16 at 20:00
  • @JeffL. - In this project I'm going to have to forward fill because while the data does not exist for those date/time in the dataset, it is assumed in this problem that values are repeated until the value changes. The link you sent is interesting....might have to learn Scala first :S – user1624577 Jun 30 '16 at 20:09
  • 1
    Yeah, I don't know Scala either. The guy who posted the answer though, @zero323, is very active on Spark questions, so he might have input here eventually. – Jeff Jun 30 '16 at 20:18

1 Answers1

12

I've found a solution that works without additional coding by using a Window here. So Jeff was right, there is a solution. full code boelow, I'll briefly explain what it does, for more details just look at the blog.

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)

# do the fill 
spark_df_filled = df6.withColumn('temperature_filled',  filled_column_temperature)

So the idea is to define a Window sliding (more on sliding windows here) through the data which always contains the actual row and ALL previous ones:

    window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)

Note that we sort by time, so data is in the correct order. Also note that using "-sys.maxsize" ensures that the window is always including all previous data and is contineously growing as it traverses through the data top-down, but there might be more efficient solutions.

Using the "last" function, we are always addressing the last row in that window. By passing "ignorenulls=True" we define that if the current row is null, then the function will return the most recent (last) non-null value in the window. Otherwise the actual row's value is used.

Done.

Alceu Costa
  • 9,733
  • 19
  • 65
  • 83
Romeo Kienzler
  • 3,373
  • 3
  • 36
  • 58
  • 7
    Better to use `Window.unboundedPreceding` instead of `-sys.maxsize` https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=column#pyspark.sql.Window – Mithril Apr 16 '19 at 06:06
  • this solution works well however when trying to persist the data I get the following error ```at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)``` anyone have a work around? – thePurplePython May 29 '19 at 18:27
  • @Romeo Kienzler https://stackoverflow.com/questions/56368747/spark-caused-by-java-lang-stackoverflowerror-window-function/56369342?noredirect=1#comment99341590_56369342 if you want to take a look – thePurplePython May 29 '19 at 23:08
  • This will not work without using the partitionBy as it will pick up the last value of the column. – Ishan Bhatt Sep 07 '20 at 13:17
  • 1
    Note that this would work only if each row is regularly spaced. The anti-example would be: `df = spark.createDataFrame([ ('mkt', 1, 1), ('mkt', 2, 2), ('mkt', 4, 4), ('mkt', 5, 5), ('mkt', 8, 8) ], schema=['mkt', 'days', 'delta'])` – Shern Mar 03 '21 at 08:51