I would like to preface this question with I'm a Spark Noob (just started reading a book 4 days ago). Nevertheless, I'm trying to port over something I wrote with the help of the Pandas library in Python so that I can take advantage of the cluster we just spun up. The data in the pandas dataframe df
looks like this:
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | NaN |
| 2017-01-01 00:01:01 | customer1 | user2 | app2 | NaN |
| 2017-01-01 00:02:01 | customer1 | user1 | app2 | NaN |
| 2017-01-01 00:03:01 | customer1 | user1 | app1 | NaN |
+---------------------+-----------+-------+-------------+----------------------+
In Python, I wrote the following:
unique_users = df.User.unique().tolist()
for j in range(0, len(unique_users):
user = unique_users[0]
access_events_for_user = df[df.User == user].copy()
indexes_for_access_events = access_events_for_user.index
applications_used = dict()
for i in range(0, len(access_events)):
current_access_event_ts = df.loc[current_auth_index].TimeStamp
if i == 0:
current_access_event_index = int(indexes_for_access_events[i])
df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000
continue
if df.loc[current_access_event_index].Application in applications_used:
time_since = current_access_event_ts - \
applications_used[df.loc[current_access_event_index].Application]).total_seconds()
df.loc[current_access_event_index, ’TimeSinceApplication] = time_since
applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
else:
df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000
applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
It spits out something like this:
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | 2592000 |
| 2017-01-01 00:01:01 | customer1 | user2 | app2 | 2592000 |
| 2017-01-01 00:02:01 | customer1 | user1 | app2 | 2592000 |
| 2017-01-01 00:03:01 | customer1 | user1 | app1 | 180 |
| | | | | |
+---------------------+-----------+-------+-------------+----------------------+
Basically, I'm trying to get the time since the user visited the application. If it's the first time the user has accessed the application, I just set it to the default of 30 days. We can partition the data by customer and order it by timestamp so that it is in order. I'm just unsure of how to do this without calling a collect()
in Spark like the answers in here, which would defeat the purpose of Spark. Is this even possible?