4

I am trying to clean a time series dataset using spark that is not fully populated and fairly large.

What I would like to do is convert the following dataset as such

Group | TS          |  Value
____________________________
A     | 01-01-2018  |  1
A     | 01-02-2018  |  2
A     | 01-03-2018  |  
A     | 01-04-2018  |  
A     | 01-05-2018  |  5
A     | 01-06-2018  |  
A     | 01-07-2018  |  10
A     | 01-08-2018  |  11

and convert it to the following

Group | TS          |  Value>
____________________________
A     | 01-01-2018  |  1
A     | 01-02-2018  |  2
A     | 01-03-2018  |  3
A     | 01-04-2018  |  4
A     | 01-05-2018  |  5
A     | 01-06-2018  |  7.5
A     | 01-07-2018  |  10
A     | 01-08-2018  |  11

If you can help that would be greatly appreciated.

ZygD
  • 22,092
  • 39
  • 79
  • 102
DaveGerson
  • 51
  • 1
  • 5
  • I can't really help you because I don't know Pyspark but it sounds like you want "interpolation". Maybe that's a useful search term for you. – katzenversteher Oct 31 '18 at 06:41
  • Cab you explain more what do you mean by "straight line"? and how do you get `7.5` in the sixth row? – Ali AzG Oct 31 '18 at 06:54
  • Sure thing Ali, so row 5 is 5 and row 7 is ten. The rule here is that 5-10 = 5 if you divide that by 2 you get 2.5. Add 2.5 to row five and you get 7.5. – DaveGerson Oct 31 '18 at 08:58
  • Did you manage to solve this? If not I'll put my problem into pandas and do it there. – Reddspark Jan 03 '19 at 18:45

3 Answers3

7

After a chat with @ndricca I've updated the code with @leo suggestions.

1st DataFrame creation:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
    ("A","01-01-2018",1),
    ("A","01-02-2018",2),
    ("A","01-03-2018",None),
    ("A","01-04-2018",None),
    ("A","01-05-2018",5),
    ("A","01-06-2018",None),
    ("A","01-07-2018",10),
    ("A","01-08-2018",11)
]
df = spark.createDataFrame(data,['Group','TS','Value'])
df = df.withColumn('TS',F.unix_timestamp('TS','MM-dd-yyyy').cast('timestamp'))

Next the updated function:

def fill_linear_interpolation(df,id_cols,order_col,value_col):
    """
    Apply linear interpolation to dataframe to fill gaps.

    :param df: spark dataframe
    :param id_cols: string or list of column names to partition by the window function
    :param order_col: column to use to order by the window function
    :param value_col: column to be filled

    :returns: spark dataframe updated with interpolated values
    """
    # create row number over window and a column with row number only for non missing values

    w = Window.partitionBy(id_cols).orderBy(order_col)
    new_df = df.withColumn('rn',F.row_number().over(w))
    new_df = new_df.withColumn('rn_not_null',F.when(F.col(value_col).isNotNull(),F.col('rn')))

    # create relative references to the start value (last value not missing)
    w_start = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(Window.unboundedPreceding,-1)
    new_df = new_df.withColumn('start_val',F.last(value_col,True).over(w_start))
    new_df = new_df.withColumn('start_rn',F.last('rn_not_null',True).over(w_start))

    # create relative references to the end value (first value not missing)
    w_end = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(0,Window.unboundedFollowing)
    new_df = new_df.withColumn('end_val',F.first(value_col,True).over(w_end))
    new_df = new_df.withColumn('end_rn',F.first('rn_not_null',True).over(w_end))

    if not isinstance(id_cols, list):
        id_cols = [id_cols]

    # create references to gap length and current gap position
    new_df = new_df.withColumn('diff_rn',F.col('end_rn')-F.col('start_rn'))
    new_df = new_df.withColumn('curr_rn',F.col('diff_rn')-(F.col('end_rn')-F.col('rn')))

    # calculate linear interpolation value
    lin_interp_func = (F.col('start_val')+(F.col('end_val')-F.col('start_val'))/F.col('diff_rn')*F.col('curr_rn'))
    new_df = new_df.withColumn(value_col,F.when(F.col(value_col).isNull(),lin_interp_func).otherwise(F.col(value_col)))

    new_df = new_df.drop('rn', 'rn_not_null', 'start_val', 'end_val', 'start_rn', 'end_rn', 'diff_rn', 'curr_rn')
    return new_df

Then function execution on our DataFrame:

new_df = fill_linear_interpolation(df=df,id_cols='Group',order_col='TS',value_col='Value')

Also checked it on my df -> post, you have to create additional group column first.

cincin21
  • 550
  • 1
  • 11
  • 26
  • What if the data is having the starts and ends with NULL values as shown below: data = [ ("A","01-01-2018",None), ("A","01-02-2018",None), ("A","01-03-2018",1), ("A","01-04-2018",2), ("A","01-05-2018",None), ("A","01-06-2018",None), ("A","01-07-2018",5), ("A","01-08-2018",7), ("A","01-09-2018",9), ("A","01-10-2018",None), ("A","01-11-2018",14), ("A","01-12-2018",None), ("A","01-13-2018",None) ] – Navaneeth Sen Dec 17 '19 at 22:25
  • The beginning and end values are not getting filled – Navaneeth Sen Dec 17 '19 at 22:25
  • Possible to do this if the TS column is not datetime values, but rather order integers (1,2,3,4,...)? @cincin21 – wowdavers Dec 03 '20 at 20:14
  • If the last items are null, and you want to replicate the last not null value, use this code (it's in Scala): val w_lastNulls = Window.partitionBy(id_cols).orderBy(order_col) new_df = new_df.withColumn(value_col, when($"$value_col".isNull, last(value_col, true).over(w_lastNulls)).otherwise($"$value_col")) – Adrián García Campos Mar 19 '21 at 09:13
6

I have implemented a solution working for Spark 2.2, mainly based on window functions. Hope could still help someone other!

First, let's recreate the dataframe:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
    ("A","01-01-2018",1),
    ("A","01-02-2018",2),
    ("A","01-03-2018",None),
    ("A","01-04-2018",None),
    ("A","01-05-2018",5),
    ("A","01-06-2018",None),
    ("A","01-07-2018",10),
    ("A","01-08-2018",11)
]
df = spark.createDataFrame(data,['Group','TS','Value'])
df = df.withColumn('TS',F.unix_timestamp('TS','MM-dd-yyyy').cast('timestamp'))

Now, the function:

def fill_linear_interpolation(df,id_cols,order_col,value_col):
    """ 
    Apply linear interpolation to dataframe to fill gaps. 

    :param df: spark dataframe
    :param id_cols: string or list of column names to partition by the window function 
    :param order_col: column to use to order by the window function
    :param value_col: column to be filled

    :returns: spark dataframe updated with interpolated values
    """
    # create row number over window and a column with row number only for non missing values
    w = Window.partitionBy(id_cols).orderBy(order_col)
    new_df = new_df.withColumn('rn',F.row_number().over(w))
    new_df = new_df.withColumn('rn_not_null',F.when(F.col(value_col).isNotNull(),F.col('rn')))

    # create relative references to the start value (last value not missing)
    w_start = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(Window.unboundedPreceding,-1)
    new_df = new_df.withColumn('start_val',F.last(value_col,True).over(w_start))
    new_df = new_df.withColumn('start_rn',F.last('rn_not_null',True).over(w_start))

    # create relative references to the end value (first value not missing)
    w_end = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(0,Window.unboundedFollowing)
    new_df = new_df.withColumn('end_val',F.first(value_col,True).over(w_end))
    new_df = new_df.withColumn('end_rn',F.first('rn_not_null',True).over(w_end))

    # create references to gap length and current gap position  
    new_df = new_df.withColumn('diff_rn',F.col('end_rn')-F.col('start_rn'))
    new_df = new_df.withColumn('curr_rn',F.col('diff_rn')-(F.col('end_rn')-F.col('rn')))

    # calculate linear interpolation value
    lin_interp_func = (F.col('start_val')+(F.col('end_val')-F.col('start_val'))/F.col('diff_rn')*F.col('curr_rn'))
    new_df = new_df.withColumn(value_col,F.when(F.col(value_col).isNull(),lin_interp_func).otherwise(F.col(value_col)))

    keep_cols = id_cols + [order_col,value_col]
    new_df = new_df.select(keep_cols)
    return new_df

Finally:

new_df = fill_linear_interpolation(df=df,id_cols='Group',order_col='TS',value_col='Value')
#+-----+-------------------+-----+
#|Group|                 TS|Value|
#+-----+-------------------+-----+
#|    A|2018-01-01 00:00:00|  1.0|
#|    A|2018-01-02 00:00:00|  2.0|
#|    A|2018-01-03 00:00:00|  3.0|
#|    A|2018-01-04 00:00:00|  4.0|
#|    A|2018-01-05 00:00:00|  5.0|
#|    A|2018-01-06 00:00:00|  7.5|
#|    A|2018-01-07 00:00:00| 10.0|
#|    A|2018-01-08 00:00:00| 11.0|
#+-----+-------------------+-----+
ndricca
  • 490
  • 4
  • 13
  • This is great, thank you! Couple things to make more usable: 1) df isn't actually used in function, needs a `new_df = df...`. 2) id_cols has to be list, I added `if not isinstance(id_cols, list): id_cols = [id_cols]`3) keep_cols method drops other possibly desirable columns in df, better to drop all interim columns created like `new_df = new_df.drop('rn', 'rn_not_null', 'start_val', 'end_val', 'start_rn', 'end_rn', 'diff_rn', 'curr_rn')` – robinovitch61 Jul 11 '19 at 22:51
  • 1
    thanks for your corrections. actually when I tried to apply this function on my dataframe I had some computational issues: the show() action on to the resulted dataframe never finished. maybe I will create a new question linking back to this one. – ndricca Jul 12 '19 at 15:34
  • Possible to do this if the TS column is not datetime values, but rather order integers (1,2,3,4,...)? @ndricca – wowdavers Dec 03 '20 at 20:14
  • @wowdavers yes, that's why a called the argument "order_col": the minimum requirement for that column is to be incremental – ndricca Dec 25 '20 at 18:30
0

If you can spend some computation time, you could use the interpolation techniques of pandas with applyInPandas for pyspark >= 3.0.0. This technique also includes interpolation by time

# pandas UDF
def impute(pdf):
  pdf['Value'] = pdf.set_index('TS')['Value'].interpolate(method='time').values
  return pdf

# calculate imputed values
sdf_impute = sdf.groupby('Group').applyInPandas(impute, "Group string, TS timestamp, Value float")
sdf_impute = sdf_impute.withColumnRenamed('Value', 'Value_impute')

# then you can e.g. join the imputed values
sdf.join(sdf_impute, ['Group', 'TS'], 'left')
Jan_ewazz
  • 361
  • 4
  • 3