1

I have a list of event log entries which look like this: (user_id, timestamp). The entries are stored on a Hive table, and are already partitioned by the date of the timestamp.

Now, I want to create sessions out of those events. A session is a collection of events that belong to a single user. If there is a gap in the user activity of 30 minutes, then I assume there's a new session. So I have a method that looks somewhat like this:

def sessionize(events: List[TrackingEvent]): Map[Integer, List[UserSession]] = {
    val eventsByUser = events.sortWith((a, b) => a.timestamp.before(b.timestamp)).groupBy(_.userId)
    val sessionsByUser: MutableMap[Integer, List[UserSession]] = MutableMap()
    for ((userId, eventList) <- eventsByUser) {
        val sessions: MutableList[UserSession] = MutableList()
        for (event <- eventList) {
            sessions.lastOption match {
                case None => sessions += UserSession.fromEvent(event)
                case Some(lastSession) if event.belongsToSession(lastSession) => lastSession.includeEvent(event)
                case Some(_) => sessions += UserSession.fromEvent(event)
            }
            sessionsByUser(userId) = sessions.toList
        }
    }
    sessionsByUser.toMap
}

The problem is that this code needs all the events of a single day to work, but that should be fine, because the files are already partitioned like this. Nevertheless, spark is still doing a lot of shuffling. Is there a better way to do this?

Thanks!

tdgs
  • 1,056
  • 1
  • 11
  • 19
  • 1
    It would be easier to help if you could provide a minimal example for us work with – James Tobin Nov 10 '16 at 18:37
  • Possible duplicate of [How to define partitioning of a Spark DataFrame?](http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe) – raam86 Nov 10 '16 at 22:55

0 Answers0