-1

I have a 'Dataset' in Java Spark related to cabs of a city, that among its several columns, it has :

  • day in the form 2016-04-02, which is the day that the cab picked up a customer.

  • vendor_id, which is for example 1.

  • hour in the form form of 2 or 16.

I want to get the hour that each vendor, each day had the maximum number of customers. So, I think I should GroupBy on these three columns. What I get after GroupBy is first 2 rows after I groupBy on day, vendor_id, hour :

+----------+---------+----+-----+
|day       |vendor_id|hour|count|
+----------+---------+----+-----+
|2016-01-01|1        |2   |116  |
|2016-01-01|1        |1   |110  |
+----------+---------+----+-----+

How can I get the hour of each day of each vendor (the groups created by GroupBy) with the maximum count?

I have already seen that this is solved with join, but this and other examples grouped only on one column where here I grouped on three.

If possible, I prefer Java code that uses Spark libraries, thank you for your time.

baitmbarek
  • 2,440
  • 4
  • 18
  • 26
exch_cmmnt_memb
  • 173
  • 1
  • 13

2 Answers2

1

La-Tex, here's a java code sample which solves the problem, from my understanding. Here we keep one single row per vendor / day. I added an extra filter to keep first row (hour based) if multiple rows still have the same count :

WindowSpec window = Window.partitionBy("vendor_id", "day");

Dataset<Row> withMaxDF = df.withColumn("maxCount", org.apache.spark.sql.functions.max(org.apache.spark.sql.functions.col("count")).over(window))
    .where("count = maxCount")
    .withColumn("rnum",org.apache.spark.sql.functions.row_number().over(window.orderBy("hour")))
    .where("rnum = 1")
    .drop("maxCount", "rnum");

withMaxDF.show();

Output :

+-----+----------+----+---------+
|count|       day|hour|vendor_id|
+-----+----------+----+---------+
|  116|2016-01-01|   2|        1|
+-----+----------+----+---------+
baitmbarek
  • 2,440
  • 4
  • 18
  • 26
0

I used Window class as @Salim suggested and it worked. Actually, I had already seen that it could be solved with Window but I thought I would be easier using join.

Dataset<Row> df_dhv_grouped = df.groupBy(
                        col("day"), col("vendor_id"), col("hour")).count();

Dataset<Row> df_max_hours =df_dhv_grouped.withColumn("max_drives_hour",max("count")
                              .over(Window.partitionBy("day","vendor_id")));

df_max_hours.filter(col("count").equalTo(col("max_drives_hour")))
                        .orderBy(col("day").asc(), col("vendor_id").asc()).show();

Thank you for your answers.

exch_cmmnt_memb
  • 173
  • 1
  • 13