0

I have a huge dataset (billions of rows) that summarize user behavior. For e.g. event type and the number of times a user performed that action

Sample data looks like this

|user ID| event_type   |count|
|-------|--------------|-----|
|user_1 |prefix1_event1|1    |
|user_1 |prefix2_event1|2    |
|user_1 |prefix1_event2|1    |
|user_1 |prefix2_event2|2    |
|user_2 |prefix1_event1|1    |
|user_2 |prefix2_event1|2    |
|user_2 |prefix1_event2|1    |
|user_2 |prefix2_event2|2    |

The event types have a standard suffix(the suffix are fixed and is small finite list) with many prefixes. I need to find for each user, and for each event suffix which event did the user perform most and how many times

So the result will look like this

|user ID| event_type   | count|
|-------|--------------|------|
|user_1 |prefix2_event1|2     |
|user_1 |prefix2_event2|2     |
|user_2 |prefix2_event1|2     |
|user_2 |prefix2_event2|2     |

I am struggling to do this as part of one aggregation. I can find the max count for each suffix for each user with something like this

max(when(col("event_type").endsWith("_event1"), col("count")))

I am unsure how to derive the corresponding event_type in the same aggregation. I tried something like this

collect_set(when((col("event_type").endsWith("_event1")) && (col("count") === max(when(col("event_type").endsWith("_event1"), col("count"))).getItem(0).as(colname)

Basically, I tried to reuse the query to get the max value as a subquery for the next query, but looks like spark does not like that. I get an error:

org.apache.spark.sql.AnalysisException: It is not allowed to use an aggregate function in the argument of another aggregate function. Please use the inner aggregate function in a sub-query

Any idea how I can achieve this as part of one aggregation (group by user ID)? PS: I am using SCALA

blackbishop
  • 30,945
  • 11
  • 55
  • 76
Shay
  • 505
  • 1
  • 3
  • 19

2 Answers2

0

You can just perform some transformations to separate the event column and perform aggregation and the do a join to get the required output.

//Sample data creation
val df = Seq(("user_1","prefix1_event1", 1),("user_1","prefix2_event1", 2),("user_1","prefix1_event2", 1),("user_1","prefix2_event2", 2),("user_2","prefix1_event1", 1),("user_2","prefix2_event1", 2),("user_2","prefix1_event2", 1),("user_2","prefix2_event2", 2))
.toDF("UserID","event_type","count")

//splitting the event type column and creating new column for just events without prefix
import org.apache.spark.sql.functions._
val df1 = df.withColumn("event",split($"event_type","_")(1))

//performing the groupby operation and max aggregate function for the count
val df2 = df1.groupBy("UserID","event").agg(max("count").as("count"))

//performing the join to get the required output
val finalDF = df2.join(df1,Seq("UserID","event","count"),"left")
.drop("event").orderBy("UserID")
.select("UserID","event_type","count")
display(finalDF)

You can see the output as below :

enter image description here

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
0

You can calculate a row number ordered by descending count within each partition of userID and event type suffix, and filter those with row number = 1 (i.e. a maximum count). This can avoid the use of joins.

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "rn",
    row_number().over(Window.partitionBy($"userID", split($"event_type", "_")(1))
                            .orderBy(desc("count"))
                     )
).filter("rn = 1").drop("rn")

df2.show
+------+--------------+-----+
|userID|    event_type|count|
+------+--------------+-----+
|user_1|prefix2_event2|    2|
|user_2|prefix2_event1|    2|
|user_1|prefix2_event1|    2|
|user_2|prefix2_event2|    2|
+------+--------------+-----+
mck
  • 40,932
  • 13
  • 35
  • 50