I have a list of event log entries which look like this: (user_id, timestamp)
. The entries are stored on a Hive table, and are already partitioned by the date of the timestamp.
Now, I want to create sessions out of those events. A session is a collection of events that belong to a single user. If there is a gap in the user activity of 30 minutes, then I assume there's a new session. So I have a method that looks somewhat like this:
def sessionize(events: List[TrackingEvent]): Map[Integer, List[UserSession]] = {
val eventsByUser = events.sortWith((a, b) => a.timestamp.before(b.timestamp)).groupBy(_.userId)
val sessionsByUser: MutableMap[Integer, List[UserSession]] = MutableMap()
for ((userId, eventList) <- eventsByUser) {
val sessions: MutableList[UserSession] = MutableList()
for (event <- eventList) {
sessions.lastOption match {
case None => sessions += UserSession.fromEvent(event)
case Some(lastSession) if event.belongsToSession(lastSession) => lastSession.includeEvent(event)
case Some(_) => sessions += UserSession.fromEvent(event)
}
sessionsByUser(userId) = sessions.toList
}
}
sessionsByUser.toMap
}
The problem is that this code needs all the events of a single day to work, but that should be fine, because the files are already partitioned like this. Nevertheless, spark is still doing a lot of shuffling. Is there a better way to do this?
Thanks!