7

I have a dataframe similar to

from pyspark.sql.functions import avg, first

rdd = sc.parallelize(
[
(0, "A", 223,"201603", "PORT"), 
(0, "A", 22,"201602", "PORT"), 
(0, "A", 22,"201603", "PORT"), 
(0, "C", 22,"201605", "PORT"), 
(0, "D", 422,"201601", "DOCK"), 
(0, "D", 422,"201602", "DOCK"), 
(0, "C", 422,"201602", "DOCK"), 
(1,"B", 3213,"201602", "DOCK"), 
(1,"A", 3213,"201602", "DOCK"), 
(1,"C", 3213,"201602", "PORT"), 
(1,"B", 3213,"201601", "PORT"), 
(1,"B", 3213,"201611", "PORT"), 
(1,"B", 3213,"201604", "PORT"), 
(3,"D", 3999,"201601", "PORT"), 
(3,"C", 323,"201602", "PORT"), 
(3,"C", 323,"201602", "PORT"), 
(3,"C", 323,"201605", "DOCK"), 
(3,"A", 323,"201602", "DOCK"), 
(2,"C", 2321,"201601", "DOCK"),
(2,"A", 2321,"201602", "PORT")
]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

and I need to aggregate by id and type and get the highest occurrence of ship per group. For example,

grouped = df_data.groupby('id','type', 'ship').count()

has a column with the number of times of each group:

+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
|  3|   A|DOCK|    1|
|  0|   D|DOCK|    2|
|  3|   C|PORT|    2|
|  0|   A|PORT|    3|
|  1|   A|DOCK|    1|
|  1|   B|PORT|    3|
|  3|   C|DOCK|    1|
|  3|   D|PORT|    1|
|  1|   B|DOCK|    1|
|  1|   C|PORT|    1|
|  2|   C|DOCK|    1|
|  0|   C|PORT|    1|
|  0|   C|DOCK|    1|
|  2|   A|PORT|    1|
+---+----+----+-----+

and I need to get

+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
|  0|   D|DOCK|    2|
|  0|   A|PORT|    3|
|  1|   A|DOCK|    1|
|  1|   B|PORT|    3|
|  2|   C|DOCK|    1|
|  2|   A|PORT|    1|
|  3|   C|PORT|    2|
|  3|   A|DOCK|    1|
+---+----+----+-----+

I tried to use a combination of

grouped.groupby('id', 'type', 'ship')\
.agg({'count':'max'}).orderBy('max(count)', ascending=False).\
groupby('id', 'type', 'ship').agg({'ship':'first'})

But it fails. Is there a way to get the maximum row from a count of a group by?

On pandas this oneliner does the job:

df_pd = df_data.toPandas()
df_pd_t = df_pd[df_pd['count'] == df_pd.groupby(['id','type', ])['count'].transform(max)]
Ivan
  • 19,560
  • 31
  • 97
  • 141
  • Possible duplicate of [Find maximum row per group in Spark DataFrame](http://stackoverflow.com/questions/35218882/find-maximum-row-per-group-in-spark-dataframe) –  Oct 18 '16 at 19:16
  • The indicated post has only one dimension on the group by. It is not clear how to expand it with the three methods on that post. – Ivan Oct 18 '16 at 19:18
  • It doesn't affect answer. Just more to put in partitionBy or groupBy. –  Oct 18 '16 at 19:19
  • Not really. I tried to do the join method and the window method and they don´t work if the group by has more than one dimension - they only consider the first one. – Ivan Oct 18 '16 at 19:22
  • Both work for me. Are you sure you use correct conditions? –  Oct 18 '16 at 19:41

1 Answers1

6

Based on your expected output, it seems you are only grouping by id and ship - since you already have distinct values in grouped - and consequently drop duplicate elements based on the columns id, ship and count, sorted by type.

To accomplish this, we can use Window functions:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = (Window
          .partitionBy(grouped['id'],
                       grouped['ship'])
          .orderBy(grouped['count'].desc(), grouped['type']))


(grouped
 .select('*', rank()
         .over(window)
         .alias('rank')) 
  .filter(col('rank') == 1)
  .orderBy(col('id'))
  .dropDuplicates(['id', 'ship', 'count'])
  .drop('rank')
  .show())
+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
|  0|   D|DOCK|    2|
|  0|   A|PORT|    3|
|  1|   A|DOCK|    1|
|  1|   B|PORT|    3|
|  2|   C|DOCK|    1|
|  2|   A|PORT|    1|
|  3|   A|DOCK|    1|
|  3|   C|PORT|    2|
+---+----+----+-----+
mtoto
  • 23,919
  • 4
  • 58
  • 71