2

My Data looks like this:

   id | duration | action1 | action2 | ...
 ---------------------------------------------
    1 | 10       |   A     |   D
    1 | 10       |   B     |   E 
    2 | 25       |   A     |   E
    1 | 7        |   A     |   G

I want to group it by ID (which works great!):

df.rdd.groupBy(lambda x: x['id']).mapValues(list).collect()

And now I would like to group values within each group by duration to get something like this:

    [(id=1,
      ((duration=10,[(action1=A,action2=D),(action1=B,action2=E),
       (duration=7,(action1=A,action2=G)),

     (id=2,
       ((duration=25,(action1=A,action2=E)))]

And here is where I dont know how to do a nested group by. Any tips?

ka_boom
  • 90
  • 2
  • 8

1 Answers1

5

There is no need to serialize to rdd. Here's a generalized way to group by multiple columns and aggregate the rest of the columns into lists without hard-coding all of them:

from pyspark.sql.functions import collect_list
grouping_cols = ["id", "duration"]
other_cols = [c for c in df.columns if c not in grouping_cols]
df.groupBy(grouping_cols).agg(*[collect_list(c).alias(c) for c in other_cols]).show()
#+---+--------+-------+-------+
#| id|duration|action1|action2|
#+---+--------+-------+-------+
#|  1|      10| [A, B]| [D, E]|
#|  2|      25|    [A]|    [E]|
#|  1|       7|    [A]|    [G]|
#+---+--------+-------+-------+

Update

If you need to preserve the order of the actions, the best way is to use a pyspark.sql.Window with an orderBy(). This is because there seems to be some ambiguity as to whether or not a groupBy() following an orderBy() maintains that order.

Suppose your timestamps are stored in a column "ts". You should be able to do the following:

from pyspark.sql import Window
w = Window.partitionBy(grouping_cols).orderBy("ts")
grouped_df = df.select(
    *(grouping_cols + [collect_list(c).over(w).alias(c) for c in other_cols])
).distinct()
pault
  • 41,343
  • 15
  • 107
  • 149
  • This does get me closer to where i need to be! But can I sort within the aggregated column? Afraid that it will get out of order. – ka_boom May 03 '18 at 21:34
  • Can you give an example where the order matters? – pault May 03 '18 at 21:56
  • In column *action1* it can be either [A,B] or [B,A]. I would like to make sure that the order is preserved by field that has a timestamp for each action. Does that help? – ka_boom May 03 '18 at 22:55
  • @ka_boom I added some code to maintain the order. I hope this answers your question. – pault May 04 '18 at 01:02
  • yes! it got me *almost* there! There is an unintended consequence - the amount of rows stays the same, and the last row for a specific ID will have a list of all items. but the rest of the rows are accumulating the list up to that point in time (ts column). I can add a workaround manually, but was wondering if there is a way to just get the last row with the full list. Thanks a lot – ka_boom May 08 '18 at 18:36
  • 1
    @ka_boom since you said "last row" I'm assuming there's some well defined order to your data. In this case, just filter your data to select the [maximum per group](https://stackoverflow.com/questions/35218882/find-maximum-row-per-group-in-spark-dataframe). For more detail, please create a new question or edit this question to include a [mcve] that demonstrates your issue and show the desired result. – pault May 08 '18 at 18:47