0

I would like to preface this question with I'm a Spark Noob (just started reading a book 4 days ago). Nevertheless, I'm trying to port over something I wrote with the help of the Pandas library in Python so that I can take advantage of the cluster we just spun up. The data in the pandas dataframe df looks like this:

+---------------------+-----------+-------+-------------+----------------------+
|      TimeStamp      | Customer  | User  | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |              NaN |
| 2017-01-01 00:01:01 | customer1 | user2 | app2        |              NaN |
| 2017-01-01 00:02:01 | customer1 | user1 | app2        |              NaN |
| 2017-01-01 00:03:01 | customer1 | user1 | app1        |              NaN |
+---------------------+-----------+-------+-------------+----------------------+

In Python, I wrote the following:

unique_users = df.User.unique().tolist()
for j in range(0, len(unique_users):
    user = unique_users[0]
    access_events_for_user = df[df.User == user].copy()
    indexes_for_access_events = access_events_for_user.index
    applications_used = dict()
    for i in range(0, len(access_events)):
        current_access_event_ts = df.loc[current_auth_index].TimeStamp 
        if i == 0:
            current_access_event_index = int(indexes_for_access_events[i])
            df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000
            continue
    if df.loc[current_access_event_index].Application in applications_used:
        time_since = current_access_event_ts - \ 
            applications_used[df.loc[current_access_event_index].Application]).total_seconds()
        df.loc[current_access_event_index, ’TimeSinceApplication] = time_since
        applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
    else:
        df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000
        applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts

It spits out something like this:

+---------------------+-----------+-------+-------------+----------------------+
|      TimeStamp      | Customer  | User  | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |              2592000 |
| 2017-01-01 00:01:01 | customer1 | user2 | app2        |              2592000 |
| 2017-01-01 00:02:01 | customer1 | user1 | app2        |              2592000 |
| 2017-01-01 00:03:01 | customer1 | user1 | app1        |                  180 |
|                     |           |       |             |                      |
+---------------------+-----------+-------+-------------+----------------------+

Basically, I'm trying to get the time since the user visited the application. If it's the first time the user has accessed the application, I just set it to the default of 30 days. We can partition the data by customer and order it by timestamp so that it is in order. I'm just unsure of how to do this without calling a collect() in Spark like the answers in here, which would defeat the purpose of Spark. Is this even possible?

the_Kid26
  • 59
  • 2
  • 9

2 Answers2

1

This is approaching the limit of complexity that's possible with the DataFrame API. Someone else may be able to suggest a method of doing this with DataFrames, but personally I think the RDD API is much more suited to this. Here's an example to give you an idea of how to structure your algorithms for Spark:

data = [(datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1'),
        (datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2'),
        (datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2'),
        (datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1')]

rdd = sc.parallelize(data)

def toTimeSince(row):
    cust_user_app, timestamps = row
    timestamps = sorted(timestamps)
    result = [(timestamps[0], *cust_user_app, timedelta(30))]
    previous_timestamp = timestamps[0]
    for timestamp in sorted(timestamps)[1:]:
        result.append((timestamp, *cust_user_app, timestamp - previous_timestamp))
    return result

(rdd
 .map(lambda row: (row[1:], [row[0]])) # Data looks like ((customer, user, app), [timestamp])
 .reduceByKey(lambda a, b: a + b) # Data looks like ((customer, user, app), list_of_timestamps)
 .flatMap(toTimeSince) # Data looks like (timestamp, customer, user, app, time_since_previous)
 .collect())

Result:

[(datetime.datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2', datetime.timedelta(30)),
 (datetime.datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2', datetime.timedelta(30)),
 (datetime.datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1', datetime.timedelta(30)),
 (datetime.datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1', datetime.timedelta(0, 180))]

The key points are:

  • The algorithm as you've described it is not inherently suited to Spark - there is a strong dependence between rows (every row must be calculated by comparing to another row), which is difficult to parallelize.
  • My suggestion uses Spark to aggregate a list of timestamps for records with the same customer, user and app. Following this, it's easy to sort the timestamps for each customer-user-app combination and expand back out into the dataset you want.
timchap
  • 503
  • 2
  • 11
0

Its possible you have to use window function in pyspark and partition by for that your window will be user and app for this. you have to give rank then and if the rank is one then set to your default value otherwise current time - previous time. I think that's what you wanted to do.

In sql terms you have to use partition by clause but to use this in pyspark you have to use window. Hope this will solve your problem a bit lazy to write the code sorry for that.

Ankit Kumar Namdeo
  • 1,426
  • 1
  • 12
  • 24