2

I have a dataframe in pyspark. Here is what it looks like,

+---------+---------+
|timestamp| price   |
+---------+---------+
|670098928|  50     |
|670098930|  53     |
|670098934|  55     |
+---------+---------+

I want to fill in the gaps in timestamp with the previous state, so that I can get a perfect set to calculate time weighted averages. Here is what the output should be like -

+---------+---------+
|timestamp| price   |
+---------+---------+
|670098928|  50     |
|670098929|  50     | 
|670098930|  53     |
|670098931|  53     |
|670098932|  53     |
|670098933|  53     |
|670098934|  55     |
+---------+---------+

Eventually, I want to persist this new dataframe on disk and visualize my analysis.

How do I do this in pyspark? (For simplicity sake, I have just kept 2 columns. My actual dataframe has 89 columns with ~670 million records before filling the gaps.)

van_d39
  • 725
  • 2
  • 14
  • 28
  • You could do interpolation with scipy. I'm not too sure PySpark can do what you want – OneCricketeer Aug 18 '16 at 00:30
  • @cricket_007 spark cannot do that. Veenit, I'm not sure why do you even want to do that ? – eliasah Aug 18 '16 at 06:12
  • @eliasah I'm trying to create a dataframe with a record for each timestamp (lowest level granularity) so that if I want to do time weighted averages, it's much convenient. – van_d39 Aug 18 '16 at 14:07

1 Answers1

1

You can generate timestamp ranges, flatten them and select rows

import pyspark.sql.functions as func

from pyspark.sql.types import IntegerType, ArrayType


a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\
.toDF(['timestamp','price'])

f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))

a.withColumn('timestamp',f(a.timestamp))\
.withColumn('timestamp',func.explode(func.col('timestamp')))\
.groupBy('timestamp')\
.agg(func.max(func.col('price')))\
.show()

+---------+----------+
|timestamp|max(price)|
+---------+----------+
|670098928|        50|
|670098929|        50|
|670098930|        53|
|670098931|        53|
|670098932|        53|
|670098933|        53|
|670098934|        55|
|670098935|        55|
|670098936|        55|
|670098937|        55|
|670098938|        55|
+---------+----------+
Stefan Falk
  • 23,898
  • 50
  • 191
  • 378
Alexis Benichoux
  • 790
  • 4
  • 13
  • I get `AttributeError: 'JavaMember' object has no attribute 'parseDataType'` when I execute `f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))` – van_d39 Aug 18 '16 at 15:13
  • No. It doesn't. Which version of Spark are you on? I'm on 2.0.0 – van_d39 Aug 23 '16 at 16:44
  • I am on 1.6.0 but if you can't define a simple udf there is something wrong in your environment. – Alexis Benichoux Aug 24 '16 at 13:08
  • you can remove the udf, and replace it by a map on the RDD, replace a.withColumn('timestamp',f(a.timestamp))\ by a.map(lambda row:(range(row[0],row[0]+5),row[1])).toDF(['timestamp','price'])\ – Alexis Benichoux Aug 24 '16 at 13:09
  • The above code works. But, doesn't inherently solve my problem. In the UDF, you hardcode `x+5`. So, what if the gap between 2 numbers is more than `5`. One answer to this would be to replace `5` with Integer.MAX_VALUE but then there needs to be constraint for the last number. Eventually, the timestamp is "Time", so I would want to explode it on "seconds" or "milliseconds" – van_d39 Jan 26 '17 at 20:21