0

With the below pyspark dataset (2.1), how to you use a windowing function that would count the number of times the current record's day of week appeared int he last 28 days.

Example Data frame:

from pyspark.sql import functions as F
df = sqlContext.createDataFrame([
    ("a", "1", "2018-01-01 12:01:01","Monday"),
        ("a", "13", "2018-01-01 14:01:01","Monday"),
        ("a", "22", "2018-01-02 22:01:01","Tuesday"),
        ("a", "43", "2018-01-08 01:01:01","Monday"),
        ("a", "43", "2018-01-09 01:01:01","Tuesday"),
        ("a", "74", "2018-01-10 12:01:01","Wednesday"),
        ("a", "95", "2018-01-15 06:01:01","Monday"),
], ["person_id", "other_id", "timestamp","dow"])


df.withColumn("dow_count",`some window function`)

Possible window

from pyspark.sql import Window
from pyspark.sql import functions as F
Days_28 = (86400 * 28)
window= Window.partitionBy("person_id").orderBy('timestamp').rangeBetween(-Days_30, -1)
## I know this next line is wrong
df.withColumn("dow_count",F.sum(F.when(Current_day=windowed_day,1).otherwise(0)).over(window))

Example Output

df.show()

+---------+--------+-------------------+---------+---------+
|person_id|other_id|          timestamp|      dow|dow_count|
+---------+--------+-------------------+---------+---------+
|        a|       1|2018-01-01 12:01:01|   Monday|0        |
|        a|      13|2018-01-01 14:01:01|   Monday|1        |
|        a|      22|2018-01-02 22:01:01|  Tuesday|0        |
|        a|      43|2018-01-08 01:01:01|   Monday|2        |
|        a|      43|2018-01-09 01:01:01|  Tuesday|1        |
|        a|      74|2018-01-10 12:01:01|Wednesday|0        |
|        a|      95|2018-01-15 06:01:01|   Monday|3        |
+---------+--------+-------------------+---------+---------+
Micah Pearce
  • 1,805
  • 3
  • 28
  • 61

2 Answers2

2

Use F.row_number(), window partitioned by (person_id, dow) and the logic with your rangeBetween() should be replaced with where():

from datetime import timedelta, datetime

N_days = 28
end = datetime.combine(datetime.today(), datetime.min.time())
start = end - timedelta(days=N_days)

window = Window.partitionBy("person_id", "dow").orderBy('timestamp')

df.where((df.timestamp < end) & (df.timestamp >= start)) \
  .withColumn('dow_count', F.row_number().over(window)-1) \
  .show()
jxc
  • 13,553
  • 4
  • 16
  • 34
  • The problem with this is that it compares the date in each row to the current timestamp instead of taking each row and counting back 28 days. – Micah Pearce Jun 07 '18 at 14:46
  • but each row can only save one `dow_count`, unless you add 27 more columns or rows in your dataframe. is this what you want? – jxc Jun 07 '18 at 15:03
  • Ah, I see, you were putting a column for each day of week. yeah, each row only needs dow_count, specifically the day of week that it is currently. So if the current day is a Monday, then I only need dow_count for mondays in the last 28 days. – Micah Pearce Jun 07 '18 at 16:36
  • I got it now. The partitioning by "dow" helped a lot. I ended up creating a unix_ts, casting it as an long and using a count over a window. – Micah Pearce Jun 07 '18 at 17:43
1

I figured it out and thought I'd share.

First create a unix timestamp and cast it to long. Then, partition by person and day of week. Finally, use the count function over the window.

from pyspark.sql import functions as F
df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))

w = Window.partitionBy('person_id','dow').orderBy('unix_ts').rangeBetween(-86400*15,-1)
df = df.withColumn('occurrences_in_7_days',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show()

Bonus: How to create the actual day of week from the timestamp.

df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))

I couldn't have done it without tips from jxc and this stackoverflow article.

Micah Pearce
  • 1,805
  • 3
  • 28
  • 61