Let's say I have the following Spark frame:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-13|1 |1 |0 |
+--------+----------+-----------+-------------------+-------------------+
Now I want to not only impute the missing dates in date
column with the right dates so that dataframe keeps its continuous time-series nature and equally sequenced frame but also impute other columns with Null
or 0
(while groupBy preferably).
My code is below:
import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict2 = [("2021-08-11 04:05:06", "A"),
("2021-08-11 04:15:06", "B"),
("2021-08-11 09:15:26", "A"),
("2021-08-11 11:04:06", "B"),
("2021-08-11 14:55:16", "A"),
("2021-08-13 04:12:11", "B"),
]
schema = StructType([
StructField("timestamp", StringType(), True), \
StructField("UserName", StringType(), True), \
])
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict2,schema=schema)
#sdf.printSchema()
#sdf.show(truncate=False)
#+-------------------+--------+
#|timestamp |UserName|
#+-------------------+--------+
#|2021-08-11 04:05:06|A |
#|2021-08-11 04:15:06|B |
#|2021-08-11 09:15:26|A |
#|2021-08-11 11:04:06|B |
#|2021-08-11 14:55:16|A |
#|2021-08-13 04:12:11|B |
#+-------------------+--------+
#Generate date and timestamp
sdf1 = sdf.withColumn('timestamp', F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
.withColumn('date', F.to_date("timestamp", "yyyy-MM-dd").cast(DateType())) \
.select('timestamp', 'date', 'UserName')
#sdf1.show(truncate = False)
#+-------------------+----------+--------+
#|timestamp |date |UserName|
#+-------------------+----------+--------+
#|2021-08-11 04:05:06|2021-08-11|A |
#|2021-08-11 04:15:06|2021-08-11|B |
#|2021-08-11 09:15:26|2021-08-11|A |
#|2021-08-11 11:04:06|2021-08-11|B |
#|2021-08-11 14:55:16|2021-08-11|A |
#|2021-08-13 04:12:11|2021-08-13|B |
#+-------------------+----------+--------+
#Aggeragate records numbers for specific features (Username) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = sdf1.groupBy("UserName", "date").agg(
F.sum(F.hour("timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),
).sort('date')
df.show(truncate = False)
The problem is when I groupBy on date
and UserName
, I missed some dates which user B
had activities but user A
not or vice versa. So I'm interested in reflecting these no activities in the Spark dataframe by refilling those dates (no need to timestamp) and allocating 0
to those columns. I'm not sure if I can do this while grouping or before or after!
I already checked some related post as well as PySpark offers window functions and inspired this answer so until now I've tried this:
# compute the list of all dates from available dates
max_date = sdf1.select(F.max('date')).first()['max(date)']
min_date = sdf1.select(F.min('date')).first()['min(date)']
print(min_date) #2021-08-11
print(max_date) #2021-08-13
#compute list of available dates based on min_date & max_date from available data
dates_list = [max_date - dt.timedelta(days=x) for x in range((max_date - min_date).days +1)]
print(dates_list)
#create a temporaray Spark dataframe for date column includng missing dates with interval 1 day
sqlCtx = SQLContext(sc)
df2 = sqlCtx.createDataFrame(data=dates_list)
#Apply leftouter join on date column
dff = df2.join(sdf1, ["date"], "leftouter")
#dff.sort('date').show(truncate = False)
#possible to use .withColumn().otherwise()
#.withColumn('date',when(col('date').isNull(),to_date(lit('01.01.1900'),'dd.MM.yyyy')).otherwise(col('date')))
#Replace 0 for null for all integer columns
dfff = dff.na.fill(value=0).sort('date')
dfff.select('date','Username', 'NoLogPerDay','NoLogPer-1st-12-hrs','NoLogPer-2nd-12-hrs').sort('date').show(truncate = False)
Please note that I'm not interested in using UDF
or hacking it via toPandas()
so expected results should be like below after groupBy:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-12|0 |0 |0 | <--
|A |2021-08-12|0 |0 |0 | <--
|B |2021-08-13|1 |1 |0 |
|A |2021-08-13|0 |0 |0 | <--
+--------+----------+-----------+-------------------+-------------------+