0

If I have a group of distinct items that look as follows

case class Order(orderId: String, orderDetails: OrderDetail, destination: String)
case class OrderDetail(date: Timestamp, recipient: String, item: String)

grouped = ordersDF.groupby($"destination")

What Im looking for is a way to find the most common item by each destination Looking at the the sql functions available for aggregation, Im not seeing something for sub groupBys on the data. Turning the data into an RDD can work, but my understanding is that it's not the best of practices.

I'd like to see something like

|Destination | mostCommon |
----------------------------   
|XYZ         |item x      |
TheM00s3
  • 3,677
  • 4
  • 31
  • 65

1 Answers1

4

You could achieve this using a combination of groupBy/aggregate functions and window functions.

Let's consider this to be the ordersDf:

+-------+--------------+-----------+
|orderId|  orderDetails|destination|
+-------+--------------+-----------+
|      1|[11,abc,item1]|       loc1|
|      2|[12,abc,item2]|       loc1|
|      3|[13,abc,item1]|       loc1|
|      4|[14,abc,item1]|       loc2|
|      5|[15,abc,item2]|       loc2|
|      6|[11,abc,item2]|       loc2|
|      7|[11,abc,item2]|       loc2|
+-------+--------------+-----------+

First, group the data by destination and item and count the frequency of each item.

val dfWithCount = ordersDf
.groupBy("destination","orderDetails.item")
.agg(count("orderDetails.item").alias("itemCount"))

The aggregated dataframe then looks like this

+-----------+-----+---------+
|destination| item|itemCount|
+-----------+-----+---------+
|       loc1|item2|        1|
|       loc2|item1|        1|
|       loc2|item2|        3|
|       loc1|item1|        2|
+-----------+-----+---------+

Since we would like to find out the most common item per location, let's partition by destination and apply the max aggregation over the itemCount column.

val maxWindowSpec = Window.partitionBy("destination")
val maxColumn = max($"itemCount").over(maxWindowSpec)
val dfWithMax = dfWithCount.withColumn("maxItemCount",maxColumn)

The resultant dataframe has both the itemCounts and maxCount of items per destination

+-----------+-----+---------+------------+
|destination| item|itemCount|maxItemCount|
+-----------+-----+---------+------------+
|       loc1|item2|        1|           2|
|       loc1|item1|        2|           2|
|       loc2|item1|        1|           3|
|       loc2|item2|        3|           3|
+-----------+-----+---------+------------+

Finally we filter out rows where the itemCount for a given (destination, item) combination is not the max item count for that destination.

val result = dfWithMax
.filter("maxItemCount - itemCount == 0")
.drop("maxItemCount","itemCount")

result.show()

+-----------+-----+
|destination| item|
+-----------+-----+
|       loc1|item1|
|       loc2|item2|
+-----------+-----+