2

Let's say I have the following Spark frame:

+--------+----------+-----------+-------------------+-------------------+
|UserName|date      |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B       |2021-08-11|2          |2                  |0                  |
|A       |2021-08-11|3          |2                  |1                  |
|B       |2021-08-13|1          |1                  |0                  |
+--------+----------+-----------+-------------------+-------------------+

Now I want to not only impute the missing dates in date column with the right dates so that dataframe keeps its continuous time-series nature and equally sequenced frame but also impute other columns with Null or 0 (while groupBy preferably).

My code is below:

import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType

dict2  = [("2021-08-11 04:05:06", "A"),
         ("2021-08-11 04:15:06", "B"),
         ("2021-08-11 09:15:26", "A"),
         ("2021-08-11 11:04:06", "B"),
         ("2021-08-11 14:55:16", "A"),
         ("2021-08-13 04:12:11", "B"),

  ]

schema = StructType([ 

    StructField("timestamp",        StringType(),    True), \
    StructField("UserName",         StringType(),    True), \
  ])
 
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict2,schema=schema)
#sdf.printSchema()
#sdf.show(truncate=False)
#+-------------------+--------+
#|timestamp          |UserName|
#+-------------------+--------+
#|2021-08-11 04:05:06|A       |
#|2021-08-11 04:15:06|B       |
#|2021-08-11 09:15:26|A       |
#|2021-08-11 11:04:06|B       |
#|2021-08-11 14:55:16|A       |
#|2021-08-13 04:12:11|B       |
#+-------------------+--------+

#Generate date and timestamp
sdf1 = sdf.withColumn('timestamp',    F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
            .withColumn('date',         F.to_date("timestamp",      "yyyy-MM-dd").cast(DateType())) \
            .select('timestamp', 'date', 'UserName') 

#sdf1.show(truncate = False)

#+-------------------+----------+--------+
#|timestamp          |date      |UserName|
#+-------------------+----------+--------+
#|2021-08-11 04:05:06|2021-08-11|A       |
#|2021-08-11 04:15:06|2021-08-11|B       |
#|2021-08-11 09:15:26|2021-08-11|A       |
#|2021-08-11 11:04:06|2021-08-11|B       |
#|2021-08-11 14:55:16|2021-08-11|A       |
#|2021-08-13 04:12:11|2021-08-13|B       |
#+-------------------+----------+--------+

#Aggeragate records numbers for specific features (Username) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = sdf1.groupBy("UserName", "date").agg(
    F.sum(F.hour("timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
    F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
    F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),

).sort('date')

df.show(truncate = False)

The problem is when I groupBy on date and UserName, I missed some dates which user B had activities but user A not or vice versa. So I'm interested in reflecting these no activities in the Spark dataframe by refilling those dates (no need to timestamp) and allocating 0 to those columns. I'm not sure if I can do this while grouping or before or after!

I already checked some related post as well as PySpark offers window functions and inspired this answer so until now I've tried this:

# compute the list of all dates from available dates
max_date = sdf1.select(F.max('date')).first()['max(date)']
min_date = sdf1.select(F.min('date')).first()['min(date)']
print(min_date) #2021-08-11
print(max_date) #2021-08-13

#compute list of available dates based on min_date & max_date from available data
dates_list = [max_date - dt.timedelta(days=x) for x in range((max_date - min_date).days +1)]
print(dates_list)

#create a temporaray Spark dataframe for date column includng missing dates with interval 1 day
sqlCtx = SQLContext(sc)
df2 = sqlCtx.createDataFrame(data=dates_list)

#Apply leftouter join on date column
dff = df2.join(sdf1, ["date"], "leftouter")
#dff.sort('date').show(truncate = False)

#possible to use .withColumn().otherwise()
#.withColumn('date',when(col('date').isNull(),to_date(lit('01.01.1900'),'dd.MM.yyyy')).otherwise(col('date')))

#Replace 0 for null for all integer columns
dfff = dff.na.fill(value=0).sort('date')
         
dfff.select('date','Username', 'NoLogPerDay','NoLogPer-1st-12-hrs','NoLogPer-2nd-12-hrs').sort('date').show(truncate = False)

Please note that I'm not interested in using UDF or hacking it via toPandas()

so expected results should be like below after groupBy:

+--------+----------+-----------+-------------------+-------------------+
|UserName|date      |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B       |2021-08-11|2          |2                  |0                  |
|A       |2021-08-11|3          |2                  |1                  | 
|B       |2021-08-12|0          |0                  |0                  | <--
|A       |2021-08-12|0          |0                  |0                  | <--
|B       |2021-08-13|1          |1                  |0                  |
|A       |2021-08-13|0          |0                  |0                  | <--
+--------+----------+-----------+-------------------+-------------------+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
Mario
  • 1,631
  • 2
  • 21
  • 51
  • @Steven I checked that but that's not cover this question since it just creates *lineofdate* which here I computed already via `dates_list` but It hasn't covered the mechanism of the imputation of *missing dates* and its imputation consequences on other columns while groupBying. The idea is in addition to refilling missing dates to trace those **no activities** when there is no info by reflecting `Null` or `0` to the Spark frame. So this post is beyond *generating date sequence*. – Mario Nov 09 '21 at 16:32
  • I just highlighted this point in the new edit. Another discussion is if there is an elegant way to do this while groupBy or if not reason it why to do it before or after. – Mario Nov 09 '21 at 16:39
  • your input data do not match. you have `("2021-09-11 04:12:11", "B"),` and `#|2021-08-13 04:12:11|B |` – Steven Nov 09 '21 at 16:48

2 Answers2

2

Here's is one way of doing:

First, generate new dataframe all_dates_df that contains the sequence of the dates from min to max date in your grouped df. For this you can use sequence function:

import pyspark.sql.functions as F

all_dates_df = df.selectExpr(
    "sequence(min(date), max(date), interval 1 day) as date"
).select(F.explode("date").alias("date"))

all_dates_df.show()
#+----------+
#|      date|
#+----------+
#|2021-08-11|
#|2021-08-12|
#|2021-08-13|
#+----------+

Now, you need to duplicate each date for all the users using a cross join with distinct UserName dataframe and finally join with the grouped df to get the desired output:

result_df = all_dates_df.crossJoin(
    df.select("UserName").distinct()
).join(
    df, 
    ["UserName", "date"],
    "left"
).fillna(0)

result_df.show()
#+--------+----------+-----------+-------------------+-------------------+
#|UserName|      date|NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
#+--------+----------+-----------+-------------------+-------------------+
#|       A|2021-08-11|          3|                  2|                  1|
#|       B|2021-08-11|          2|                  2|                  0|
#|       A|2021-08-12|          0|                  0|                  0|
#|       B|2021-08-12|          0|                  0|                  0|
#|       B|2021-08-13|          1|                  1|                  0|
#|       A|2021-08-13|          0|                  0|                  0|
#+--------+----------+-----------+-------------------+-------------------+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • I liked your *inexpensive* approach, and it is fascinating not to use `Window.partitionBy()`. Is there a big difference when you do *left* `join()` & `crossJoin()` in this scenario? based on your explanation, `crossJoin()` duplicates the dates for all users.s. – Mario Nov 15 '21 at 12:37
  • 1
    @Mario `crossJoin` is used because you need to associate each possible date in `all_dates_df` with each `userName` before joining with df. in other words, the corss join do exactly the same thing as I suggested in my comment in the others answer `possible_user_dates = df.selectExpr("collect_set(UserName) as users", "sequence(min(date), max(date), interval 1 day) as dates").withColumn("UserName", F.explode("users")).select("UserName", F.explode("dates").alias("date"))` – blackbishop Nov 15 '21 at 14:57
2

Essentially, you may generate all the possible options and left join on this to achieve your missing date.

The sequence sql function may be helpful here to generate all your possible dates. You may pass it your min and max date along with your interval you would like it to increment by. The following examples continue with the code on your google collab.

Using the functions min,max,collect_set and table generating functions explode you may achieve the following:

possible_user_dates=(
    # Step 1 - Get all possible UserNames and desired dates
    df.select(
           F.collect_set("UserName").alias("UserName"),
           F.expr("sequence(min(date),max(date), interval 1 day)").alias("date")
       )
      # Step 2 - Use explode to split the collected arrays into rows (ouput immediately below)
      .withColumn("UserName",F.explode("UserName"))
      .withColumn("date",F.explode("date"))
      .distinct()
      
)
possible_user_dates.show(truncate=False)
+--------+----------+
|UserName|date      |
+--------+----------+
|B       |2021-08-11|
|A       |2021-08-11|
|B       |2021-08-12|
|A       |2021-08-12|
|B       |2021-08-13|
|A       |2021-08-13|
+--------+----------+

Performing your left join

final_df = (
    possible_user_dates.join(
        df,
        ["UserName","date"],
        "left"
    )
    # Since the left join will place NULLs where values are missing. 
    # Eg. where a User was not active on a particular date
    # We use `fill` to replace the null values with `0`
    .na.fill(0)
)

final_df.show(truncate=False)
+--------+----------+-----------+-------------------+-------------------+
|UserName|date      |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B       |2021-08-11|2          |2                  |0                  |
|A       |2021-08-11|3          |2                  |1                  |
|B       |2021-08-12|0          |0                  |0                  |
|A       |2021-08-12|0          |0                  |0                  |
|B       |2021-08-13|1          |1                  |0                  |
|A       |2021-08-13|0          |0                  |0                  |
+--------+----------+-----------+-------------------+-------------------+

For debugging purposes, I've included the output of a few intermediary steps

Step 1 Output:

df.select(
           F.collect_set("UserName").alias("UserName"),
           F.expr("sequence(min(date),max(date), interval 1 day)").alias("date")
       ).show(truncate=False)
+--------+------------------------------------+
|UserName|date                                |
+--------+------------------------------------+
|[B, A]  |[2021-08-11, 2021-08-12, 2021-08-13]|
+--------+------------------------------------+
ggordon
  • 9,790
  • 2
  • 14
  • 27
  • You don't need to use Window for this. `possible_user_dates = df.selectExpr("collect_set(UserName) as users", "sequence(min(date), max(date), interval 1 day) as dates").withColumn("UserName", F.explode("users")).select("UserName", F.explode("dates").alias("date"))` – blackbishop Nov 14 '21 at 13:58