I need to join two spark dataframes on a timestamp column. The problem is that they have different frequencies: the first dataframe (df1) has an observation every 10 minutes, while the second one (df2) is 25 hz (25 observations every sec, which is 15000 times more frequent than df1). Each dataframe has over 100 columns, and millions of rows. To make a smooth join, I am trying to resample df1 down to 25 hz, front fill the Null values caused by resampling, and then join the dataframes once they are at the same frequency. The dataframes are too big, which is why I'm trying to use spark instead of pandas.
So, here is the question: let's say, I have the following spark dataframe:
I want to resample it down to 25 hz (25 observations per sec), so that it would look like this:
How to do that efficiently in pyspark?
Note:
I tried to resample my df1 using the code from an earlier question (PySpark: how to resample frequencies) as below:
from pyspark.sql.functions import col, max as max_, min as min_
freq = x # x is the frequency in seconds
epoch = (col("timestamp").cast("bigint") / freq).cast("bigint") * freq
with_epoch = df1.withColumn("dummy", epoch)
min_epoch, max_epoch = with_epoch.select(min_("dummy"), max_("dummy")).first()
new_df = spark.range(min_epoch, max_epoch + 1, freq).toDF("dummy")
new_df.join(with_epoch, "dummy", "left").orderBy("dummy")
.withColumn("timestamp_resampled", col("dummy").cast("timestamp"))
It seems, the above code only works when the intended frequency is more than or equal to a sec. For example, when freq = 1, it produces the following table:
However, when I pass 25 hz as the frequency (i.e. freq = 1/25) the code fails, because the 'step' in the spark.range function can not be less than 1.
Is there a workaround to solve this issue? Or any other way to re-sample the frequency down to milliseconds?