0

I've got a question on interpolating values in one column when I have complete TS column column ('b' here):

from pyspark.sql import SparkSession
from pyspark import Row

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

df = spark.createDataFrame([Row(a=1, b='2019-09-26 09:53:10', c='7793740'),
                            Row(a=2, b='2019-09-26 09:54:12', c=''),
                            Row(a=3, b='2019-09-26 09:55:11', c='7793742'),
                            Row(a=4, b='2019-09-26 09:56:10', c=''),
                            Row(a=5, b='2019-09-26 09:57:11', c=''),
                            Row(a=6, b='2019-09-26 09:58:10', c='7793745'),
                            Row(a=7, b='2019-09-26 09:59:11', c=''),
                            Row(a=8, b='2019-09-26 10:00:10', c='7793747')])

df = df.withColumn('c', df['c'].cast('int'))
df = df.withColumn('b', df['b'].cast('timestamp'))
df.show()

|  a|                  b|      c|
+---+-------------------+-------+
|  1|2019-09-26 09:53:10|7793740|
|  2|2019-09-26 09:54:12|   null|
|  3|2019-09-26 09:55:11|7793742|
|  4|2019-09-26 09:56:10|   null|
|  5|2019-09-26 09:57:11|   null|
|  6|2019-09-26 09:58:10|7793745|
|  7|2019-09-26 09:59:11|   null|
|  8|2019-09-26 10:00:10|7793747|
+---+-------------------+-------+

In pandas it would be simple, like:

import pandas as pd
import numpy as np

pdf = df.toPandas()

pdf = pdf.set_index('b')
pdf = pdf.interpolate(method='index', axis=0, limit_direction='forward')
pdf.reset_index(inplace=True)

                    b  a             c
0 2019-09-26 09:53:10  1  7.793740e+06
1 2019-09-26 09:54:12  2  7.793741e+06
2 2019-09-26 09:55:11  3  7.793742e+06
3 2019-09-26 09:56:10  4  7.793743e+06
4 2019-09-26 09:57:11  5  7.793744e+06
5 2019-09-26 09:58:10  6  7.793745e+06
6 2019-09-26 09:59:11  7  7.793746e+06
7 2019-09-26 10:00:10  8  7.793747e+06

Can we avoid udfs in my case? If not, how to use them (I'm thinking on case where I would have millions of rows).

Can we also use interpolation in both directions in cases when first value is null? Thank you!

cincin21
  • 550
  • 1
  • 11
  • 26
  • possible duplicate of this [question](https://stackoverflow.com/questions/53077639/pyspark-interpolation-of-missing-values-in-pyspark-dataframe-observed/56708962#56708962) please look also at the comments in my answer – ndricca Jul 20 '19 at 11:29
  • @ndricca Hi, how I bypassed [this](https://stackoverflow.com/questions/53077639/pyspark-interpolation-of-missing-values-in-pyspark-dataframe-observed) post, I really do not know! One question to your great explanation: does it work in interpolation in `both directions` like the `pandas` `interpolate` function? I also like @NeilZ 's udf answer, so should I keep my question or delete it? – cincin21 Jul 22 '19 at 06:31
  • @ndricca One more: could you update your function with suggested code lines mentioned by leo, because it's not working properly. Best! – cincin21 Jul 22 '19 at 08:48
  • my functions only works on null values if both non null data before and after do exist for the selected partition of dataframe. Moreover, i am no longer using that function and at the moment I have difficulties to access a pyspark cli so I am not able to apply suggested edits. i suggest you to do and publish an updated version as new answer linking back to my answer. – ndricca Jul 22 '19 at 09:17
  • 1
    @ndricca Sure, I will try to post it today. Best! – cincin21 Jul 22 '19 at 10:51
  • just one more thing: @NeilZ answer is first best if you have spark 2.3+, mine works on olwer version (tested on spark2.2) – ndricca Jul 23 '19 at 07:41
  • @ndricca still it has taken me **2min +** to process few rows in `jupyter lab` as my `pycharm` has some problem with `pyarrow` package – cincin21 Jul 23 '19 at 07:48

1 Answers1

1

Seems there's no direct function to interpolate between spark DataFrame columns, Here's my thought how to do this. We can put the interpolate into a UDF.

spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame([Row(a=1, b='2019-09-26 09:53:10', c='7793740'),
                            Row(a=2, b='2019-09-26 09:54:12', c=''),
                            Row(a=3, b='2019-09-26 09:55:11', c='7793742'),
                            Row(a=4, b='2019-09-26 09:56:10', c=''),
                            Row(a=5, b='2019-09-26 09:57:11', c=''),
                            Row(a=6, b='2019-09-26 09:58:10', c='7793745'),
                            Row(a=7, b='2019-09-26 09:59:11', c=''),
                            Row(a=8, b='2019-09-26 10:00:10', c='7793747')])

df = df.withColumn('c', df['c'].cast('int'))
df = df.withColumn('b', df['b'].cast('timestamp'))

df = df.withColumn('flag', F.lit(1))
df.show()
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def interpolate(pdf):
    pdf = pdf.set_index('b')
    pdf.sort_values(by=['a'], inplace=True)
    pdf = pdf.interpolate(method='index', axis=0, limit_direction='forward')
    pdf.reset_index(inplace=True)
    return pdf

df = df.groupby(['flag']).apply(interpolate)

df.sort(df['a']).show()

This outputs:

+---+-------------------+-------+----+
|  a|                  b|      c|flag|
+---+-------------------+-------+----+
|  1|2019-09-26 09:53:10|7793740|   1|
|  2|2019-09-26 09:54:12|7793741|   1|
|  3|2019-09-26 09:55:11|7793742|   1|
|  4|2019-09-26 09:56:10|7793742|   1|
|  5|2019-09-26 09:57:11|7793744|   1|
|  6|2019-09-26 09:58:10|7793745|   1|
|  7|2019-09-26 09:59:11|7793746|   1|
|  8|2019-09-26 10:00:10|7793747|   1|
+---+-------------------+-------+----+

If there are millions of rows, you can use two or three flag values, i.e. [1,2], splitting the data into several groups, and apply interpolation on each sub-range. But do use limit_area to restrain on interpolate only. There'll be at most two Nulls for each flag value. Then you re-assign the flags such that the nulls are enclosed by valid data, re-do the interpolation.

maybe other people can think about a better method.

niuer
  • 1,589
  • 2
  • 11
  • 14
  • Hi, when I execute your code an error appears `WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 24, localhost, executor driver): TaskKilled (Stage cancelled)`. Not sure what I am doing wrong? – cincin21 Jul 22 '19 at 08:30
  • I added some lines omitted in my answer, they are actually coming from your question. Please try again. – niuer Jul 22 '19 at 15:40
  • There's seems to be an issue with my `pyarrow` library in `pycharm`. When I execute the code in `jupyter lab` it works fine. Still the timing: `2min 18s ± 5.19 s per loop` seems massive! – cincin21 Jul 23 '19 at 06:56
  • Did you use spark cluster when running on jupyter? Compare with normal python to see if it's really massive :) – niuer Jul 23 '19 at 14:43
  • Just opened it on my local machine, still learning Spark and parallel computing, can you explain a bit more on the topic? Thanks! – cincin21 Jul 23 '19 at 17:54
  • Just want to make sure when you run job on jupyter, the whole cluster is working for you. You can check how many executors are running on spark's webpage GUI. – niuer Jul 23 '19 at 18:27