1

Sorry for a newbie question.

Currently I have log files which contains fields such as: userId, event, and timestamp, while lacking of the sessionId. My aim is to create a sessionId for each record based on the timestamp and a pre-defined value TIMEOUT.

If the TIMEOUT value is 10, and sample DataFrame is:

scala> eventSequence.show(false)

  +----------+------------+----------+ 
  |uerId     |event       |timestamp |
  +----------+------------+----------+ 
  |U1        |A           |1         | 
  |U2        |B           |2         |
  |U1        |C           |5         |
  |U3        |A           |8         |
  |U1        |D           |20        |
  |U2        |B           |23        |
  +----------+------------+----------+

The goal is:

  +----------+------------+----------+----------+
  |uerId     |event       |timestamp |sessionId |
  +----------+------------+----------+----------+
  |U1        |A           |1         |S1        |
  |U2        |B           |2         |S2        |
  |U1        |C           |5         |S1        |
  |U3        |A           |8         |S3        |
  |U1        |D           |20        |S4        |
  |U2        |B           |23        |S5        |
  +----------+------------+----------+----------+

I find one solution in R (Create a "sessionID" based on "userID" and differences in "timeStamp"), while I am not able to figure it out in Spark.

Thanks for any suggestions on this problem.

Community
  • 1
  • 1
Torrence
  • 448
  • 3
  • 20
  • Possible duplicate of [How to add a column in a Spark DataFrame?](http://stackoverflow.com/questions/32788322/how-to-add-a-column-in-a-spark-dataframe) – Shawn Guo Dec 31 '15 at 03:20
  • It is not a duplicated question as the linked question. The linked question shows "how to add a new column in DataFrame", while what I need is "how to calculate the new column value (say, sessionId here) in DataFrame" – Torrence Dec 31 '15 at 04:08

2 Answers2

0

Shawn's answer regards on "How to create a new column", while my aim is to "How to create an sessionId column based on timestamp". After days of struggling, the Window function is applied in this scenario as a simple solution.

Window is introduced since Spark 1.4, it provides functions when such operations is needed:

both operate on a group of rows while still returning a single value for every input row

In order to create a sessionId based on timestamp, first I need to get the difference between user A's two immediate operations. The windowDef defines the Window will be partition by "userId" and ordered by timestamp, then diff is a column which will return a value for each row, whose value will be 1 row after the current row in the partition(group), or null if the current row is the last row in this partition

def handleDiff(timeOut: Int) = {
  udf {(timeDiff: Int, timestamp: Int) => if(timeDiff > timeOut) timestamp + ";" else timestamp + ""}
}
val windowDef = Window.partitionBy("userId").orderBy("timestamp")
val diff: Column = lead(eventSequence("timestamp"), 1).over(windowDef)
val dfTSDiff = eventSequence.
withColumn("time_diff", diff - eventSequence("timestamp")).
withColumn("event_seq", handleDiff(TIME_OUT)(col("time_diff"), col("timestamp"))).
groupBy("userId").agg(GroupConcat(col("event_seq")).alias("event_seqs"))

Updated: Then exploit the Window function to apply the "cumsum"-like operation (provided in Pandas):

// Define a Window, partitioned by userId (partitionBy), ordered by timestamp (orderBy), and delivers all rows before current row in this partition as frame (rowsBetween)
val windowSpec = Window.partitionBy("userId").orderBy("timestamp").rowsBetween(Long.MinValue, 0)
val sessionDf = dfTSDiff.
  withColumn("ts_diff_flag", genTSFlag(TIME_OUT)(col("time_diff"))).
  select(col("userId"), col("eventSeq"), col("timestamp"), sum("ts_diff_flag").over(windowSpec).alias("sessionInteger")).
  withColumn("sessionId", genSessionId(col("userId"), col("sessionInteger")))

Previously: Then split by ";" and get each session, create a sessionId; afterwards split by "," and explodes to final result. Thus sessionId is created with the help of string operations. (This part should be replaced by cumulative sum operation instead, however I did not find a good solution)

Any idea or thought about this question is welcomed.


GroupConcat could be found here: SPARK SQL replacement for mysql GROUP_CONCAT aggregate function

Reference: databricks introduction

Community
  • 1
  • 1
Torrence
  • 448
  • 3
  • 20
-4

dt.withColumn('sessionId', expression for the new column sessionId)
for example:
dt.timestamp + pre-defined value TIMEOUT

Shawn Guo
  • 3,169
  • 3
  • 21
  • 28