2

Trying to calculate the days passed since a user first started using an application and the event the df row represents. The below code (via) creates a column comparing the row to the previous row, but I need it compared to the first row of the partition.

window = Window.partitionBy('userId').orderBy('dateTime')

df = df.withColumn("daysPassed", datediff(df.dateTime, 
                                lag(df.dateTime, 1).over(window)))

Tried "int(Window.unboundedPreceding)" in place of 1, which threw an error.

Example of what I'd like the daysPassed column to do:

 Row(userId='59', page='NextSong', datetime='2018-10-01', daysPassed=0),
 Row(userId='59', page='NextSong', datetime='2018-10-03', daysPassed=2),
 Row(userId='59', page='NextSong', datetime='2018-10-04', daysPassed=3)
fakewalls
  • 23
  • 4

1 Answers1

2

So, if I'm getting it right, essentially you'd want to calculate the difference of the date in the row to the minimum date (start date) of the user, and not the lag().

from pyspark.sql import functions as func
window = Window.partitionBy('userId')

df_b = df_a.withColumn("daysPassed", func.datediff(df.dateTime, func.min(df.dateTime).over(window)))

This calculates the days from the first date user started an app.

samkart
  • 6,007
  • 2
  • 14
  • 29
  • Getting: TypeError: Column is not iterable – fakewalls Dec 10 '19 at 04:49
  • 1
    use `from pyspark.sql import functions as func` and then `... func.min(df.dateTime).over(window)))` – samkart Dec 10 '19 at 05:03
  • Thank you. Unfortunately, now I'm running into another error. I will do some digging on my end. Error: AnalysisException: "grouping expressions sequence is empty, and '`artist`' is not an aggregate function. Wrap '(min(`dateTime`) AS `_w0`)' in windowing function(s) or wrap '`artist`' in first() (or first_value) if you don't care which value you get. – fakewalls Dec 10 '19 at 05:18