2

I have JSON raw data like below

{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}

I created the dataframe for above raw data

val rawDataFrame = sparkSession.read.option("multiline", "true").json(cleanJsonLines)

I need to find out how many seconds each user logged in to our system. The final result expected is like below

{"user_id": 978699, "logged_in_sec":8} // (2019-11-20 00:14:47 - 2019-11-20 00:14:46) + (2019-11-20 00:14:57 - 2019-11-20 00:14:50)
{"user_id": 992210, "logged_in_sec":0}
{"user_id": 823323, "logged_in_sec":1}

I am new to spark and scala and not able to figure out how to use a window function here for this problem.

I don't want to write a procedural way of code which iterate each row of dataframe and calculate the difference between previous "login" and current "logout" event per user_id.

Please guide me with an approach to solve this problem. Thanks for reading.

Update:

There is one gotcha in the raw data. It is not necessary that login and logout will be consecutive always. I mean to say per user_id event can be in below order also. another logout can occur after multiple logout and similarly, login can occur after multiple login events.

{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:46","user_id" : 992210}
{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 823323}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 823323}
{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:55","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:56","user_id" : 978699}    
{"event" : "logout","time" : "2019-11-20 00:14:57","user_id" : 978699}

So, for requested aggregation. If I get any set of consecutive "logout" events, I have to take last login from the set of login events just before the first "logout" event and create a window.

login 
login
login <- window start
logout <- window end
logout
login <- window start
logout <- window end
login <- window start
logout <- window end
logout

Update 2

So following the above approach for user_id "978699" windows will be like below

window 1

{"event" : "login","time" : "2019-11-20 00:14:46","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:47","user_id" : 978699}

window 2

{"event" : "login","time" : "2019-11-20 00:14:50","user_id" : 978699}
{"event" : "logout","time" : "2019-11-20 00:14:55","user_id" : 978699}

Sum of logged-in time will be (47-46) + (55-50) = 6 sec

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420

3 Answers3

2

NEW Approach

I added the index for login and logout event, and guess that the number of logout events cannot exceed the number of login events. By using this assumption, I did this.

import org.apache.spark.sql.expressions.Window
val idPartition = Window.partitionBy("user_id").orderBy("time")

val df2 = df.withColumn("login_index", sum(when($"event" === "login", 1)).over(idPartition))
            .withColumn("logout_index", sum(when($"event" === "logout", 1)).over(idPartition))

df2.show(false)



val login = df2.where($"event" === "login")
                .withColumnRenamed("time", "login_time")
                .drop("logout_index")

login.show(false)

val logout = df2.where($"event" === "logout")
                .withColumnRenamed("time", "logout_time")
                .drop("login_index")

logout.show(false)

val finaldf = login.as("a").join(logout.as("b"), $"a.login_index" === $"b.logout_index"  && $"a.user_id" === $"b.user_id", "inner")
                .withColumn("session_time", unix_timestamp($"b.logout_time") - unix_timestamp($"a.login_time"))
                .select("a.login_time", "b.logout_time", "a.user_id", "a.login_index", "b.logout_index", "session_time")

finaldf.show(false)

val result = finaldf.groupBy("user_id").agg(sum("session_time") as "logged_in_sec")

result.show(false)

The idea is a bit simple but the code is not. First, take an index for login and logout events and check the maximum value of the index. Then, you noticed that some user has more logout event, that cannot be matched with the login events.

The results in order are

+------+-------------------+-------+-----------+------------+
|event |time               |user_id|login_index|logout_index|
+------+-------------------+-------+-----------+------------+
|logout|2019-11-20 00:14:46|992210 |null       |1           |
|login |2019-11-20 00:14:46|978699 |1          |null        |
|logout|2019-11-20 00:14:47|978699 |1          |1           |
|login |2019-11-20 00:14:50|978699 |2          |1           |
|logout|2019-11-20 00:14:55|978699 |2          |2           |
|logout|2019-11-20 00:14:56|978699 |2          |3           |
|logout|2019-11-20 00:14:57|978699 |2          |4           |
|login |2019-11-20 00:14:46|823323 |1          |null        |
|logout|2019-11-20 00:14:47|823323 |1          |1           |
+------+-------------------+-------+-----------+------------+

+-----+-------------------+-------+-----------+
|event|login_time         |user_id|login_index|
+-----+-------------------+-------+-----------+
|login|2019-11-20 00:14:46|978699 |1          |
|login|2019-11-20 00:14:50|978699 |2          |
|login|2019-11-20 00:14:46|823323 |1          |
+-----+-------------------+-------+-----------+

+------+-------------------+-------+------------+
|event |logout_time        |user_id|logout_index|
+------+-------------------+-------+------------+
|logout|2019-11-20 00:14:46|992210 |1           |
|logout|2019-11-20 00:14:47|978699 |1           |
|logout|2019-11-20 00:14:55|978699 |2           |
|logout|2019-11-20 00:14:56|978699 |3           |
|logout|2019-11-20 00:14:57|978699 |4           |
|logout|2019-11-20 00:14:47|823323 |1           |
+------+-------------------+-------+------------+

+-------------------+-------------------+-------+-----------+------------+------------+
|login_time         |logout_time        |user_id|login_index|logout_index|session_time|
+-------------------+-------------------+-------+-----------+------------+------------+
|2019-11-20 00:14:46|2019-11-20 00:14:47|978699 |1          |1           |1           |
|2019-11-20 00:14:50|2019-11-20 00:14:55|978699 |2          |2           |5           |
|2019-11-20 00:14:46|2019-11-20 00:14:47|823323 |1          |1           |1           |
+-------------------+-------------------+-------+-----------+------------+------------+

+-------+-------------+
|user_id|logged_in_sec|
+-------+-------------+
|978699 |6            |
|823323 |1            |
+-------+-------------+

where the last output is the final.

Lamanus
  • 12,898
  • 4
  • 21
  • 47
  • I think that is really hard to recognize which one is the logout for the login. If there is a session unique id then it will be easier. – Lamanus Nov 24 '19 at 12:50
  • Yes, you are right having a session identifier could be helpful in deciding the exact login. But we don't have that in our raw data. So to make our problem solvable we decided to consider the last login before the first "logout" as outr required "login". I mentioned an example as an update in the description of the question. I have to somehow clean the data to get using the above assumption and create another dataframe with clean data with consecutive login and logout. – Mayank Prajapati Nov 24 '19 at 13:01
  • that was an interesting way again. But my apologies again, I think I didn't explain the question properly. We have to find out windows per user_id and the we have sum the aggregation of those windows. I updated the same in question under heading "update 2". I am thinking of transforming raw data like in the below post and then use your old approach to get the final result. What do u say? https://stackoverflow.com/questions/52146821/drop-consecutive-duplicates-in-a-pyspark-dataframe – Mayank Prajapati Nov 24 '19 at 14:10
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/202977/discussion-between-mayank-prajapati-and-lamanus). – Mayank Prajapati Nov 24 '19 at 15:31
2

Here you go,

**source extraction**
val rawDataFrame = spark.read.format("json").load("../cleanjsonLines.json")
rawDataFrame.printSchema

root
 |-- event: string (nullable = true)
 |-- time: string (nullable = true)
 |-- user_id: long (nullable = true)

**casting to timestamp**
val tfDataFrame = rawDataFrame.selectExpr("event","to_timestamp(time) as time","user_id")
tfDataFrame.printSchema

root
 |-- event: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- user_id: long (nullable = true)

**creating temp view**
tfDataFrame.createOrReplaceTempView("SysEvent")

**Creating widowed temp view for each valid sessions**
spark.sql("select * from (select *,lag(event,-1) over (partition by user_id  order by time) as next_event, lag(time,-1) over (partition by user_id order by time) as next_time from SysEvent) a where event = 'login' and next_event = 'logout' order by user_id,time").createOrReplaceTempView("WindowSysEvent")
spark.sql("select * from WindowSysEvent").show()

Result for source dataset:    
+-----+-------------------+-------+----------+-------------------+
|event|               time|user_id|next_event|          next_time|
+-----+-------------------+-------+----------+-------------------+
|login|2019-11-20 00:14:46| 823323|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:46| 978699|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:50| 978699|    logout|2019-11-20 00:14:57|
+-----+-------------------+-------+----------+-------------------+

Result for updated dataset:
+-----+-------------------+-------+----------+-------------------+
|event|               time|user_id|next_event|          next_time|
+-----+-------------------+-------+----------+-------------------+
|login|2019-11-20 00:14:46| 823323|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:46| 978699|    logout|2019-11-20 00:14:47|
|login|2019-11-20 00:14:50| 978699|    logout|2019-11-20 00:14:55|
+-----+-------------------+-------+----------+-------------------+

**aggregation for valid sessions**
val result = spark.sql("select user_id, sum(unix_timestamp(next_time) - unix_timestamp(time)) as logged_in_sec from windowSysEvent group by user_id")
result.show()

Result for source dataset:
+-------+-------------+
|user_id|logged_in_sec|
+-------+-------------+
| 978699|            8|
| 823323|            1|
+-------+-------------+

Result for updated dataset:
+-------+-------------+
|user_id|logged_in_sec|
+-------+-------------+
| 978699|            6|
| 823323|            1|
+-------+-------------+

**write to target**
result.coalesce(1).write.format("json").save("../result.json")

Result for source dataset:
{"user_id":823323,"logged_in_sec":1}
{"user_id":978699,"logged_in_sec":8}

Result updated dataset:
{"user_id":823323,"logged_in_sec":1}
{"user_id":978699,"logged_in_sec":6}

result.explain
== Physical Plan ==
*(5) HashAggregate(keys=[user_id#184L], functions=[sum((unix_timestamp(next_time#898, yyyy-MM-dd HH:mm:ss, Some(..)) - unix_timestamp(time#188, yyyy-MM-dd HH:mm:ss, Some(..))))])
+- Exchange hashpartitioning(user_id#184L, 200)
   +- *(4) HashAggregate(keys=[user_id#184L], functions=[partial_sum((unix_timestamp(next_time#898, yyyy-MM-dd HH:mm:ss, Some(..)) - unix_timestamp(time#188, yyyy-MM-dd HH:mm:ss, Some(..))))])
      +- *(4) Sort [user_id#184L ASC NULLS FIRST, time#188 ASC NULLS FIRST], true, 0
         +- Exchange rangepartitioning(user_id#184L ASC NULLS FIRST, time#188 ASC NULLS FIRST, 200)
            +- *(3) Project [time#188, user_id#184L, next_time#898]
               +- *(3) Filter (((isnotnull(event#182) && isnotnull(next_event#897)) && (event#182 = login)) && (next_event#897 = logout))
                  +- Window [lag(event#182, -1, null) windowspecdefinition(user_id#184L, time#188 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS next_event#897, lag(time#188, -1, null) windowspecdefinition(user_id#184L, time#188 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS next_time#898], [user_id#184L], [time#188 ASC NULLS FIRST]
                     +- *(2) Sort [user_id#184L ASC NULLS FIRST, time#188 ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(user_id#184L, 200)
                           +- *(1) Project [event#182, cast(time#183 as timestamp) AS time#188, user_id#184L]
                              +- *(1) FileScan json [event#182,time#183,user_id#184L] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:../cleanjsonLines.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<event:string,time:string,user_id:bigint>
Aravind Palani
  • 154
  • 1
  • 5
  • As I was worried. This will fail with data like below. {"user_id":671041,"event":"logout","time":"2019-11-20 00:18:16"} {"user_id":671041,"event":"login","time":"2019-11-20 00:18:36"} {"user_id":671041,"event":"logout","time":"2019-11-20 00:18:45"} {"user_id":671041,"event":"logout","time":"2019-11-20 00:30:00"} – Mayank Prajapati Nov 24 '19 at 16:53
  • Aggregation using above code will give {"login_time":"2019-11-20 00:18:36","logout_time":"2019-11-20:00:18:16","user_id":671041,"session_time":-20} Correct should be {"login_time":"2019-11-20 00:18:36","logout_time":"2019-11-20 00:18:45","user_id":671041,"session_time":9} Please, correct me if I am wrong. – Mayank Prajapati Nov 24 '19 at 16:54
  • I get your scenario, shall I consider the users only if they have a valid session (login and logout). eg: 992210 will not available in result dataframe since we have only a logout event. – Aravind Palani Nov 24 '19 at 19:00
  • Yes, that is the correct assumption. Anyways, can you review my solution updated in question description? Please let me know in case of any bugs or if you think anything can be optimized further – Mayank Prajapati Nov 24 '19 at 19:03
  • 1
    I've updated my answer also attached physical plan. Let me know if you find it more efficient. – Aravind Palani Nov 24 '19 at 20:25
1

Raw Data

{"user_id":346214,"event":"logout","time":"2019-11-20 00:19:41"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:19:43"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:22:09"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:22:12"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:24:12"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:24:14"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:25:43"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:25:45"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:29:55"}
{"user_id":346214,"event":"login","time":"2019-11-20 00:29:57"}
{"user_id":346214,"event":"logout","time":"2019-11-20 00:30:00"}


//create dataframe with only login events sorted by user_id, time
val leftDF = rawDataFrame.filter(col("event")===lit("login")).orderBy("user_id","time")
leftDF.show()

//create dataframe with only logout events sorted by user_id, time
val rightDF = rawDataFrame.filter(col("event")===lit("logout")).orderBy("user_id","time")
rightDF.show()

// join left and right dataframe such that logoutDF row time is greater that loginDF row time.
val joinedDF = leftDF.as("loginDF")
  .join(rightDF.as("logoutDF"),
    col("logoutDF.time") >= col("loginDF.time")
      &&
      col("loginDF.user_id") === col("logoutDF.user_id"),"left")
  .orderBy("loginDF.user_id","loginDF.time","logoutDF.time")
  .groupBy(col("loginDF.user_id").as("user_id"),col("loginDF.time").as("login"))
  .agg(first("logoutDF.time").as("logout"))
  .orderBy("user_id","login","logout")
// this will create data like below, now we have to remove the overlap from below data

{"user_id":346214,"login":"2019-11-20 00:25:45","logout":"2019-11-20 00:29:55","group_id":4,"updated_login":"2019-11-20 00:25:45","update_logout":"2019-11-20 00:29:55","session_time":250}
{"user_id":346214,"login":"2019-11-20 00:24:14","logout":"2019-11-20 00:25:43","group_id":3,"updated_login":"2019-11-20 00:24:14","update_logout":"2019-11-20 00:25:43","session_time":89}
{"user_id":346214,"login":"2019-11-20 00:29:57","logout":"2019-11-20 00:30:00","group_id":5,"updated_login":"2019-11-20 00:29:57","update_logout":"2019-11-20 00:30:00","session_time":3}
{"user_id":346214,"login":"2019-11-20 00:22:12","logout":"2019-11-20 00:24:12","group_id":2,"updated_login":"2019-11-20 00:22:12","update_logout":"2019-11-20 00:24:12","session_time":120}
{"user_id":346214,"login":"2019-11-20 00:19:43","logout":"2019-11-20 00:22:09","group_id":1,"updated_login":"2019-11-20 00:19:43","update_logout":"2019-11-20 00:22:09","session_time":146}

// to remove the overlap, I followed this post
https://stackoverflow.com/questions/52877237/in-spark-scala-how-to-check-overlapping-dates-from-adjacent-rows-in-a-dataframe/52881823

val win1 = Window.partitionBy(col("user_id")).orderBy(col("login"), col("logout"))
val win2 = Window.partitionBy(col("user_id"), col("group_id"))
val finalDF = joinedDF.
  withColumn("group_id", when(
    col("login").between(lag(col("login"), 1).over(win1), lag(col("logout"), 1).over(win1)), null
  ).otherwise(monotonically_increasing_id)
  ).
  withColumn("group_id", last(col("group_id"), ignoreNulls=true).
    over(win1.rowsBetween(Window.unboundedPreceding, 0))
  ).
  withColumn("updated_login", min(col("login")).over(win2)).
  withColumn("update_logout", max(col("logout")).over(win2)).
  orderBy("user_id", "login", "logout")
  .dropDuplicates(Seq("user_id","updated_login", "updated_logout"))
  .withColumn("session_time", unix_timestamp(col("updated_logout")) - unix_timestamp(col("updated_login")))

//this will generate below data
{"user_id":346214,"logged_in_min":10.133333333333333}
val result = finalDF.groupBy("user_id").agg((sum("session_time")/60) as "logged_in_min").filter(col("logged_in_min").isNotNull)
result.coalesce(1).write.format("json").mode(SaveMode.Overwrite).save("../final_result.json")