1

I need to join two spark dataframes on a timestamp column. The problem is that they have different frequencies: the first dataframe (df1) has an observation every 10 minutes, while the second one (df2) is 25 hz (25 observations every sec, which is 15000 times more frequent than df1). Each dataframe has over 100 columns, and millions of rows. To make a smooth join, I am trying to resample df1 down to 25 hz, front fill the Null values caused by resampling, and then join the dataframes once they are at the same frequency. The dataframes are too big, which is why I'm trying to use spark instead of pandas.

So, here is the question: let's say, I have the following spark dataframe:

sample_df

I want to resample it down to 25 hz (25 observations per sec), so that it would look like this:

Expected_result

How to do that efficiently in pyspark?

Note:

I tried to resample my df1 using the code from an earlier question (PySpark: how to resample frequencies) as below:

from pyspark.sql.functions import col, max as max_, min as min_

freq = x   # x is the frequency in seconds

epoch = (col("timestamp").cast("bigint") / freq).cast("bigint") * freq 

with_epoch  = df1.withColumn("dummy", epoch)

min_epoch, max_epoch = with_epoch.select(min_("dummy"), max_("dummy")).first()

new_df = spark.range(min_epoch, max_epoch + 1, freq).toDF("dummy")

new_df.join(with_epoch, "dummy", "left").orderBy("dummy")
.withColumn("timestamp_resampled", col("dummy").cast("timestamp"))

It seems, the above code only works when the intended frequency is more than or equal to a sec. For example, when freq = 1, it produces the following table:

undesired_result

However, when I pass 25 hz as the frequency (i.e. freq = 1/25) the code fails, because the 'step' in the spark.range function can not be less than 1.

Is there a workaround to solve this issue? Or any other way to re-sample the frequency down to milliseconds?

M. Mate
  • 47
  • 8
  • I had a similar case, instead of using range with step = 1/25, have you tried converting your epoch to millisecond first (multiply by 1000) then your new step in range would be 40, and you could then convert your epoch back to timestamp using : from_unixtime with a format : "yyyy-MM-dd'T'HH:mm:ss.SSS" ? In my version of SparkSQL Timestamp are stored in second, but I have another backend capturing timeseries stored in nanosecond. To join and resample I had to rescale the latter by a factor 10^9 first then it worked. – Yoan B. M.Sc Aug 28 '20 at 18:30

1 Answers1

1

If your objective is to join 2 dataframes, I'd suggest to use an inner join directly:

df = df1.join(df2, df1.Timestamp == df2.Timestamp)

However, if you want to try to downsample the dataframe, you can convert timestamp to miliseconds an keep those rows that mod(timestamp, 25) == 0. You can use this only if you are sure that data is sampled perfectly.

from pyspark.sql.functions import col
df1 = df1.filter( ((col("Timestamp") % 25) == 0 )

Other option is to number each row and keep 1 every 25. With this solution, you are going to reduce rows without considering the timestamp. Another problem of this solution is that you need to sort data (not efficient).

PD: Premature optimization is the root of all evil

Edit: Timestamp to int

Let's create a fake dataset full of timestamps using epoch standard with miliseconds.

>>>  df = sqlContext.range(1559646513000, 1559646520000)\
                    .select( (F.col('id')/1000).cast('timestamp').alias('timestamp'))
>>> df
DataFrame[timestamp: timestamp]
>>> df.show(5,False)
+-----------------------+
|timestamp              |
+-----------------------+
|2019-06-04 13:08:33    |
|2019-06-04 13:08:33.001|
|2019-06-04 13:08:33.002|
|2019-06-04 13:08:33.003|
|2019-06-04 13:08:33.004|
+-----------------------+
only showing top 5 rows

Now, convert back to integers:

>>> df.select( (df.timestamp.cast('double')*1000).cast('bigint').alias('epoch') )\
      .show(5, False)
+-------------+
|epoch        |
+-------------+
|1559646513000|
|1559646513001|
|1559646513002|
|1559646513003|
|1559646513004|
+-------------+
only showing top 5 rows
Daniel Argüelles
  • 2,229
  • 1
  • 33
  • 56
  • Hi Daniel, thanks for the good idea. I was thinking of direct joining as an option B (though, was thinking of using 'outer' join to keep all the possible rows), and looking at your remarks on the resampling, it seems resampling would have a lot of issues, so I will probably go with the join option. Still, could you please eleborate on converting timestamp to milliseconds? How to do that? Thanks! – M. Mate Jun 04 '19 at 11:55