1

I'm trying to figure out if what I'm trying to accomplish is even possible in Spark. Let's say I have a CSV that if read in as a DataFrame that looks like so:

+---------------------+-----------+-------+-------------+
|      TimeStamp      | Customer  | User  | Application |
+---------------------+-----------+-------+-------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |
| 2017-01-01 12:00:05 | customer1 | user1 | app1        |
| 2017-01-01 14:00:03 | customer1 | user2 | app2        |
| 2017-01-01 23:50:50 | customer1 | user1 | app1        |
| 2017-01-02 00:00:02 | customer1 | user1 | app1        |
+---------------------+-----------+-------+-------------+

I'm trying to produce a dataframe that includes a count of the number of the times a unique user from a certain customer has visited an application in the last 24 hours. So the result would look like so:

+---------------------+-----------+-------+-------------+----------------------+
|      TimeStamp      | Customer  | User  | Application | UniqueUserVisitedApp |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |                    0 |
| 2017-01-01 12:00:05 | customer1 | user2 | app1        |                    1 |
| 2017-01-01 13:00:05 | customer1 | user2 | app1        |                    2 |
| 2017-01-01 14:00:03 | customer1 | user1 | app1        |                    2 |
| 2017-01-01 23:50:50 | customer1 | user3 | app1        |                    2 |
| 2017-01-01 23:50:51 | customer2 | user4 | app2        |                    0 |
| 2017-01-02 00:00:02 | customer1 | user1 | app1        |                    3 |
+---------------------+-----------+-------+-------------+----------------------+

So I can do a tumbling window with the following code below, but that's not quite what we are looking for.

val data = spark.read.csv('path/to/csv')

val tumblingWindow = data
    .groupBy(col("Customer"), col("Application"), window(data.col("TimeStamp"), "24 hours"))
    .agg(countDistinct("user")).as("UniqueUsersVisitedApp")

The result is this:

+-----------+-------------+-------------------------+-----------------------+
| Customer  | Application |         Window          | UniqueUsersVisitedApp |
+-----------+-------------+-------------------------+-----------------------+
| customer1 | app1        | [2017-01-01 00:00:00... |                     2 |
| customer2 | app2        | [2017-01-01 00:00:00... |                     1 |
| customer1 | app1        | [2017-01-02 00:00:00... |                     1 |
+-----------+-------------+-------------------------+-----------------------+

Any help would be much appreciated.

himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70
the_Kid26
  • 59
  • 2
  • 9

2 Answers2

0

I have tried it using pyspark window function, by creating subpartition for each date and apply count on them.Not sure how efficient they are. Here is my code snippet,

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import TimestampType

>>> l = [('2017-01-01 00:00:01','customer1','user1','app1'),('2017-01-01 12:00:05','customer1','user1','app1'),('2017-01-01 14:00:03','customer1','user2','app2'),('2017-01-01 23:50:50','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user1','app1'),('2017-01-02 12:00:02','customer1','user1','app1'),('2017-01-03 14:00:02','customer1','user1','app1'),('2017-01-02 00:00:02','customer1','user2','app2'),('2017-01-01 16:04:01','customer1','user1','app1'),('2017-01-01 23:59:01','customer1','user1','app1'),('2017-01-01 18:00:01','customer1','user2','app2')]
>>> df = spark.createDataFrame(l,['TimeStamp','Customer','User','Application'])
>>> df = df.withColumn('TimeStamp',df['TimeStamp'].cast('timestamp')).withColumn('Date',F.to_date(F.col('TimeStamp')))
>>> df.show()
+-------------------+---------+-----+-----------+----------+
|          TimeStamp| Customer| User|Application|      Date|
+-------------------+---------+-----+-----------+----------+
|2017-01-01 00:00:01|customer1|user1|       app1|2017-01-01|
|2017-01-01 12:00:05|customer1|user1|       app1|2017-01-01|
|2017-01-01 14:00:03|customer1|user2|       app2|2017-01-01|
|2017-01-01 23:50:50|customer1|user1|       app1|2017-01-01|
|2017-01-02 00:00:02|customer1|user1|       app1|2017-01-02|
|2017-01-02 12:00:02|customer1|user1|       app1|2017-01-02|
|2017-01-03 14:00:02|customer1|user1|       app1|2017-01-03|
|2017-01-02 00:00:02|customer1|user2|       app2|2017-01-02|
|2017-01-01 16:04:01|customer1|user1|       app1|2017-01-01|
|2017-01-01 23:59:01|customer1|user1|       app1|2017-01-01|
|2017-01-01 18:00:01|customer1|user2|       app2|2017-01-01|
+-------------------+---------+-----+-----------+----------+

>>> df.printSchema()
root
 |-- TimeStamp: timestamp (nullable = true)
 |-- Customer: string (nullable = true)
 |-- User: string (nullable = true)
 |-- Application: string (nullable = true)
 |-- Date: date (nullable = true)

>>> w = Window.partitionBy('Customer','User','Application','Date').orderBy('Timestamp')
>>> diff = F.coalesce(F.datediff("TimeStamp", F.lag("TimeStamp", 1).over(w)), F.lit(0))
>>> subpartition = F.count(diff<1).over(w)
>>> df.select("*",(subpartition-1).alias('Count')).drop('Date').orderBy('Customer','User','Application','TimeStamp').show()
+-------------------+---------+-----+-----------+-----+
|          TimeStamp| Customer| User|Application|Count|
+-------------------+---------+-----+-----------+-----+
|2017-01-01 00:00:01|customer1|user1|       app1|    0|
|2017-01-01 12:00:05|customer1|user1|       app1|    1|
|2017-01-01 16:04:01|customer1|user1|       app1|    2|
|2017-01-01 23:50:50|customer1|user1|       app1|    3|
|2017-01-01 23:59:01|customer1|user1|       app1|    4|
|2017-01-02 00:00:02|customer1|user1|       app1|    0|
|2017-01-02 12:00:02|customer1|user1|       app1|    1|
|2017-01-03 14:00:02|customer1|user1|       app1|    0|
|2017-01-01 14:00:03|customer1|user2|       app2|    0|
|2017-01-01 18:00:01|customer1|user2|       app2|    1|
|2017-01-02 00:00:02|customer1|user2|       app2|    0|
+-------------------+---------+-----+-----------+-----+
Suresh
  • 5,678
  • 2
  • 24
  • 40
  • your solution is counting the number of times a particular user has visited an application and not the number of times a unique user has visited an application. After the first entry, all the count for user 1 should be 1 except the one with the timestamp `2017-01-03 14:00:02` – the_Kid26 Jul 26 '17 at 12:59
  • oh sorry, I missed that. Will check it. – Suresh Jul 26 '17 at 13:13
0

If I understand your question correctly, just apply a filter before doing the groupBy:

data = spark.read.csv('path/to/csv')

result = (data
          .filter(data['TimeStamp'] > now_minus_24_hours)
          .groupBy(["Customer", "Application", "User"])
          .count())

Note that users who haven't visited in the last 24 hours will be missing from the DataFrame, instead of having a count of zero.

Edit

If you are trying to get the number of visits in the last 24 hours relative to each timestamp, you can do something similar to my answer here. The basic steps will be:

  1. reduceByKey to get a list of timestamps for each user/app/customer combination (identical to the other example). Each row will now be in the form:

    ((user, app, customer), list_of_timestamps)

  2. Process each list of timestamps to generate a list of "number of visits in the previous 24 hours" for each timestamp. The data will now be in the form:

    ((user, app, customer), [(ts_0, num_visits_24hr_before_ts_0), (ts_1, num_visits_24_hr_before ts_2), ...])

  3. flatMap each row back to multiple rows using something like:

    lambda row: [(*row[0], *ts_num_visits) for ts_num_visits in row[1]]

timchap
  • 503
  • 2
  • 11
  • It's more of a running count of unqiue users that have accessed the application the 24 hours before that timestamp of the row that is currently being considered.Your solution just filters for all access events that took place in the last 24 hours. I apologize if my question was ambiguous. – the_Kid26 Jul 27 '17 at 12:17
  • @the_Kid26 In that case, you should be able to adapt [my answer to your very similar question](https://stackoverflow.com/questions/45150084/iterating-throws-rows-of-a-dataframe-and-setting-value-in-spark/45150622#45150622) to get the result you want. – timchap Jul 27 '17 at 12:59
  • Edited my answer above to give an idea of what I mean. – timchap Jul 27 '17 at 13:16