I'm trying to aggregate my data by getting the sum every 30 seconds. I would like to know if the result of this aggregation is zero, this will happen if there are no rows in that 30s region.
Here's a minimal working example illustrating the result I would like with pandas, and where it falls short with pyspark.
Input data
import pandas as pd
from pyspark.sql import functions as F
df = pd.DataFrame(
[
(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-10T15:27:29+00:00"),
(25, "2017-03-10T15:27:30+00:00"),
(101, "2017-03-10T15:29:00+00:00"),
(99, "2017-03-10T15:29:29+00:00")
],
columns=["dollars", "timestamp"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
print(df)
dollars timestamp
0 17 2017-03-10 15:27:18+00:00
1 13 2017-03-10 15:27:29+00:00
2 25 2017-03-10 15:27:30+00:00
3 101 2017-03-10 15:29:00+00:00
4 99 2017-03-10 15:29:29+00:00
Pandas solution
With pandas, we can use resample to aggregate every 30 second window, and then apply the sum function over these windows (note the results for 2017-03-10 15:28:00+00:00
, and 2017-03-10 15:28:30+00:00
):
desired_result = df.set_index("timestamp").resample("30S").sum()
desired_result
dollars
timestamp
2017-03-10 15:27:00+00:00 30
2017-03-10 15:27:30+00:00 25
2017-03-10 15:28:00+00:00 0
2017-03-10 15:28:30+00:00 0
2017-03-10 15:29:00+00:00 200
PySpark near solution
In pyspark, we can use pyspark.sql.functions.window
to window over every 30 seconds (adapted, with thanks from this stack answer), but this will miss out the window where there are no rows:
spark: pyspark.sql.session.SparkSession # I expect you to have set up your session...
sdf = spark.createDataFrame(df)
sdf.groupby(
F.window("timestamp", windowDuration="30 seconds", slideDuration="30 seconds")
).agg(F.sum("dollars")).display()
window,sum(dollars)
"{""start"":""2017-03-10T15:27:30.000+0000"",""end"":""2017-03-10T15:28:00.000+0000""}",25
"{""start"":""2017-03-10T15:27:00.000+0000"",""end"":""2017-03-10T15:27:30.000+0000""}",30
"{""start"":""2017-03-10T15:29:00.000+0000"",""end"":""2017-03-10T15:29:30.000+0000""}",200
Question
How do I get pyspark to return window results for time window where there are no rows (like pandas)?