Let's say I have the following pandas dataframe with a non-standard timestamp column without datetime format. Due to I need to include a new column and convert it into an 24hourly-based timestamp for time-series visualizing matter by:
df['timestamp(24hrs)'] = round(df['timestamp(sec)']/24*3600)
and get this:
+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0 |U100|435 |
|1.0 |U100|1091 |
|2.0 |U100|992 |
|3.0 |U100|980 |
|4.0 |U100|288 |
|8.0 |U100|260 |
|9.0 |U100|879 |
|10.0 |U100|875 |
|11.0 |U100|911 |
|13.0 |U100|628 |
|14.0 |U100|642 |
|16.0 |U100|631 |
|17.0 |U100|233 |
... ... ...
|267.0 |U100|1056 |
|269.0 |U100|878 |
|270.0 |U100|256 |
+----------------+----+-----+
Now I noticed that some records' timestamps are missing, and I need to impute those missing data:
timestamp(24hrs)
in continuous ordercount
value by0
Expected output:
+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0 |U100|435 |
|1.0 |U100|1091 |
|2.0 |U100|992 |
|3.0 |U100|980 |
|4.0 |U100|288 |
|5.0 |U100|0 |
|6.0 |U100|0 |
|7.0 |U100|0 |
|8.0 |U100|260 |
|9.0 |U100|879 |
|10.0 |U100|875 |
|11.0 |U100|911 |
|12.0 |U100|0 |
|13.0 |U100|628 |
|14.0 |U100|642 |
|15.0 |U100|0 |
|16.0 |U100|631 |
|17.0 |U100|233 |
... ... ...
|267.0 |U100|1056 |
|268.0 |U100|0 |
|269.0 |U100|878 |
|270.0 |U100|256 |
+----------------+----+-----+
Any idea how can I do this? Based on this answer over standard timestamp, I can imagine I need to create a new column timestamp(24hrs)
from the start and end of the previous one and do left join()
& crossJoin()
but I couldn't manage it yet.
I've tried the following unsuccessfully:
import pyspark.sql.functions as F
all_dates_df = df.selectExpr(
"sequence(min(timestamp(24hrs)), max(timestamp(24hrs)), interval 1 hour) as hour"
).select(F.explode("timestamp(24hrs)").alias("timestamp(24hrs)"))
all_dates_df.show()
result_df = all_dates_df.crossJoin(
df.select("UserName").distinct()
).join(
df,
["count", "timestamp(24hrs)"],
"left"
).fillna(0)
result_df.show()