4

I'm trying to aggregate my data by getting the sum every 30 seconds. I would like to know if the result of this aggregation is zero, this will happen if there are no rows in that 30s region.

Here's a minimal working example illustrating the result I would like with pandas, and where it falls short with pyspark.

Input data

import pandas as pd
from pyspark.sql import functions as F

df = pd.DataFrame(
    [
        (17, "2017-03-10T15:27:18+00:00"),
        (13, "2017-03-10T15:27:29+00:00"),
        (25, "2017-03-10T15:27:30+00:00"),
        (101, "2017-03-10T15:29:00+00:00"),
        (99, "2017-03-10T15:29:29+00:00")
    ],
    columns=["dollars", "timestamp"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
print(df)
    dollars timestamp
0   17  2017-03-10 15:27:18+00:00
1   13  2017-03-10 15:27:29+00:00
2   25  2017-03-10 15:27:30+00:00
3   101 2017-03-10 15:29:00+00:00
4   99  2017-03-10 15:29:29+00:00

Pandas solution

With pandas, we can use resample to aggregate every 30 second window, and then apply the sum function over these windows (note the results for 2017-03-10 15:28:00+00:00, and 2017-03-10 15:28:30+00:00):

desired_result = df.set_index("timestamp").resample("30S").sum()
desired_result
                            dollars
timestamp   
2017-03-10 15:27:00+00:00   30
2017-03-10 15:27:30+00:00   25
2017-03-10 15:28:00+00:00   0
2017-03-10 15:28:30+00:00   0
2017-03-10 15:29:00+00:00   200

PySpark near solution

In pyspark, we can use pyspark.sql.functions.window to window over every 30 seconds (adapted, with thanks from this stack answer), but this will miss out the window where there are no rows:

spark: pyspark.sql.session.SparkSession  # I expect you to have set up your session...
sdf = spark.createDataFrame(df)
sdf.groupby(
    F.window("timestamp", windowDuration="30 seconds", slideDuration="30 seconds")
).agg(F.sum("dollars")).display()
window,sum(dollars)
"{""start"":""2017-03-10T15:27:30.000+0000"",""end"":""2017-03-10T15:28:00.000+0000""}",25
"{""start"":""2017-03-10T15:27:00.000+0000"",""end"":""2017-03-10T15:27:30.000+0000""}",30
"{""start"":""2017-03-10T15:29:00.000+0000"",""end"":""2017-03-10T15:29:30.000+0000""}",200

Question

How do I get pyspark to return window results for time window where there are no rows (like pandas)?

James Owers
  • 7,948
  • 10
  • 55
  • 71

2 Answers2

3

You can use timestamp arithmetic as mentioned in this answer (I recommend you take a look at it as he goes into details). In your case it would be:

from pyspark.sql import functions as F

seconds = 30
epoch = (F.col("timestamp").cast("timestamp").cast("bigint") / seconds).cast(
    "bigint"
) * seconds
df = spark.createDataFrame(
    [
        (17, "2017-03-10T15:27:18+00:00"),
        (13, "2017-03-10T15:27:29+00:00"),
        (25, "2017-03-10T15:27:30+00:00"),
        (101, "2017-03-10T15:29:00+00:00"),
        (99, "2017-03-10T15:29:29+00:00"),
    ],
    ["dollars", "timestamp"],
).withColumn("epoch", epoch)

min_epoch, max_epoch = df.select(F.min("epoch"), F.max("epoch")).first()

ref = spark.range(min_epoch, max_epoch + seconds, seconds).toDF("epoch")

(
    ref.join(df, "epoch", "left")
    .withColumn("ts_resampled", F.timestamp_seconds("epoch"))
    .groupBy("ts_resampled")
    .sum("dollars")
    .orderBy("ts_resampled")
    .fillna(0, subset=["sum(dollars)"])
    .show(truncate=False)
)

Output

|ts_resampled       |sum(dollars)|
+-------------------+------------+
|2017-03-10 12:27:00|30          |
|2017-03-10 12:27:30|25          |
|2017-03-10 12:28:00|0           |
|2017-03-10 12:28:30|0           |
|2017-03-10 12:29:00|200         |
+-------------------+------------+
Steven
  • 14,048
  • 6
  • 38
  • 73
ottovon
  • 333
  • 2
  • 10
3

Same solution than Ottovon but in Spark2.4 and without the action first:

from pyspark.sql import functions as F

seconds = 30
epoch = (F.col("timestamp").cast("timestamp").cast("bigint") / seconds).cast(
    "bigint"
) * seconds
df = spark.createDataFrame(
    [
        (17, "2017-03-10T15:27:18+00:00"),
        (13, "2017-03-10T15:27:29+00:00"),
        (25, "2017-03-10T15:27:30+00:00"),
        (101, "2017-03-10T15:29:00+00:00"),
        (99, "2017-03-10T15:29:29+00:00"),
    ],
    ["dollars", "timestamp"],
).withColumn("epoch", epoch)

ref = df.select(
    F.min("epoch").alias("min_epoch"), F.max("epoch").alias("max_epoch")
).select(
    F.explode(F.sequence("min_epoch", "max_epoch", F.lit(seconds))).alias("epoch")
)

ref.show()
+----------+                                                                    
|     epoch|
+----------+
|1489159620|
|1489159650|
|1489159680|
|1489159710|
|1489159740|
+----------+
resampled_df = (
    ref.join(df, "epoch", "left")
    .withColumn("ts_resampled", F.from_unixtime("epoch"))
    .groupBy("ts_resampled")
    .agg(F.coalesce(F.sum("dollars"), F.lit(0)).alias("dollars"))
    .orderBy("ts_resampled")
)

resampled_df.show()
+-------------------+-------+                                                   
|       ts_resampled|dollars|
+-------------------+-------+
|2017-03-10 15:27:00|     30|
|2017-03-10 15:27:30|     25|
|2017-03-10 15:28:00|      0|
|2017-03-10 15:28:30|      0|
|2017-03-10 15:29:00|    200|
+-------------------+-------+
Steven
  • 14,048
  • 6
  • 38
  • 73
  • In epoch you are only transforming the date to epoch no? I tried with a column of only date and seems it doesn't work very well, I think it is better to use `epoch = F.unix_timestamp(F.col('date'),"yyyy-MM-dd")` and that way is more understandable, and you can define how is the format. – set92 Nov 02 '21 at 07:38