0

I have timeseries data that looks a bit like this (timestamp, value):

14 Dec 2020    1000
15 Jan 2021    1000
20 Jan 2021    1000
18 Feb 2021    1000
03 Mar 2021    1000

I'm essentially trying to get monthly values, smoothing out the value for every month. Each row represents the "value" between the two dates, so if we wanted to calculate the value for January, we'd need the value to represent:

15 days of January from the value in December + 5 days between Jan 15 - Jan 20 + 11 days between Jan 20 - Feb 18.

Value would be calculated as number of days relevant to the current month / length of whole interval * value:

Value for Jan: (15/32) * 1000 + (5/5) * 1000 + (11/28) * 1000

I've tried using resampling with the window function, but resampling on 1 month gives me an exception and also it simply returns the intervals instead of resampling everything.

Any advice is appreciated. Thanks.

Ehrendil
  • 233
  • 3
  • 13

1 Answers1

2

You can interpolate the values between the dates using sequence, then group by the month and average over the values in each month.

EDIT: used an UDF from this answer because sequence is not supported for Spark 2.2

import pyspark.sql.functions as F
from pyspark.sql.types import *
import datetime

def generate_date_series(start, stop):
    return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))

result = df.withColumn(
    'timestamp',
    F.to_date(F.col('timestamp'), 'dd MMM yyyy')
).withColumn(
    'next_timestamp',
    F.expr("""
        generate_date_series(
            lag(timestamp, 1, timestamp + interval 1 day) -- need a default value for the last row
                over(order by timestamp) + interval 1 day,  -- don't want to include the previous date
            timestamp
        )
    """)
).select(
    F.explode('next_timestamp').alias('timestamp'), 
    (F.col('value') / F.size('next_timestamp')).alias('value')
).groupBy(
    F.year('timestamp').alias('year'),
    F.month('timestamp').alias('month')
).agg(
    F.sum('value').alias('value')
).orderBy('year', 'month')

result.show(truncate=False)
+----+-----+------------------+
|year|month|value             |
+----+-----+------------------+
|2020|12   |531.25            |
|2021|1    |1848.0603448275874|
|2021|2    |1389.920424403183 |
|2021|3    |230.76923076923077|
+----+-----+------------------+
mck
  • 40,932
  • 13
  • 35
  • 50
  • Hi, I'm getting an exception saying "Undefined function 'sequence'. This function is neither a registered temporary function nor a permanent function registered in the database." Any thoughts? – Ehrendil Jan 06 '21 at 09:58
  • I have 2.2.0 installed. – Ehrendil Jan 06 '21 at 10:30
  • @Ehrendil this only works in spark 2.4 or above. Is it possible for you to update your spark version? 2.2 is quite old – mck Jan 06 '21 at 10:30
  • Unfortunately I think that's not something I can do. Company policies and such... Is there any hint without sequence you could think of? – Ehrendil Jan 06 '21 at 10:31
  • @Ehrendil I edited to use a python udf. Does that help? – mck Jan 06 '21 at 10:55
  • A bit. Thank you. It's giving me an error that the module object has no attribute explode_outer. Can I do it without it? – Ehrendil Jan 06 '21 at 10:58
  • @Ehrendil my bad, should have used `explode`. – mck Jan 06 '21 at 10:59
  • Are you sure we're getting the right values, though? Jan here should be 1861,607? – Ehrendil Jan 06 '21 at 11:21
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/226924/discussion-between-mck-and-ehrendil). – mck Jan 06 '21 at 12:15