I have JSON raw data like below
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}
I created the dataframe for above raw data
val rawDataFrame = sparkSession.read.option("multiline", "true").json(cleanJsonLines)
I need to find out how many seconds each user logged in to our system. The final result expected is like below
{"user_id": 978699, "logged_in_sec":8} // (2019-11-20 00:14:47 - 2019-11-20 00:14:46) + (2019-11-20 00:14:57 - 2019-11-20 00:14:50)
{"user_id": 992210, "logged_in_sec":0}
{"user_id": 823323, "logged_in_sec":1}
I am new to spark and scala and not able to figure out how to use a window function here for this problem.
I don't want to write a procedural way of code which iterate each row of dataframe and calculate the difference between previous "login" and current "logout" event per user_id.
Please guide me with an approach to solve this problem. Thanks for reading.
Update:
There is one gotcha in the raw data. It is not necessary that login and logout will be consecutive always. I mean to say per user_id event can be in below order also. another logout can occur after multiple logout and similarly, login can occur after multiple login events.
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:55","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:56","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}
So, for requested aggregation. If I get any set of consecutive "logout" events, I have to take last login from the set of login events just before the first "logout" event and create a window.
login
login
login <- window start
logout <- window end
logout
login <- window start
logout <- window end
login <- window start
logout <- window end
logout
Update 2
So following the above approach for user_id "978699" windows will be like below
window 1
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
window 2
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:55","user_id" : 978699}
Sum of logged-in time will be (47-46) + (55-50) = 6 sec