2

I have a dataframe with the following columns - User, Order, Food.

For example:

df = spark.createDataFrame(pd.DataFrame([['A','B','A','C','A'],[1,1,2,1,3],['Eggs','Salad','Peaches','Bread','Water']],index=['User','Order','Food']).T)

I would like to concatenate all of the foods into a single string sorted by order and grouped by per user

If I run the following:

df.groupBy("User").agg(concat_ws(" $ ",collect_list("Food")).alias("Food List"))

I get a single list but the foods are not concatenated in order.

User Food List
B   Salad
C   Bread
A   Eggs $ Water $ Peaches

What is a good way to get the food list concatenated in order?

Eyal S.
  • 1,141
  • 4
  • 17
  • 29
  • Possible duplicate of https://stackoverflow.com/questions/46580253/collect-list-by-preserving-order-based-on-another-variable – SMaZ Aug 28 '19 at 17:30

2 Answers2

2

Based on the possible duplicate comment - collect_list by preserving order based on another variable, I was able to come up with a solution.

First define a sorter function. This takes a struct, sorts by order and then returns the list of items in a string format separated by ' $ '

# define udf
def sorter(l):
  res = sorted(l, key=lambda x: x.Order)
  return ' $ '.join([item[1] for item in res])

sort_udf = udf(sorter,StringType())

Then create the struct and run the sorter function:

SortedFoodList = (df.groupBy("User")
                    .agg(collect_list(struct("Order","Food")).alias("food_list"))
                    .withColumn("sorted_foods",sort_udf("food_list"))
                    .drop("food_list")
                  )
Ryan
  • 2,167
  • 2
  • 28
  • 33
Eyal S.
  • 1,141
  • 4
  • 17
  • 29
2

Try use window here:

  1. Build the DataFrame
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import mean, pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame(pd.DataFrame([['A','B','A','C','A'],[1,1,2,1,3],['Eggs','Salad','Peaches','Bread','Water']],index=['User','Order','Food']).T)
df.show()

+----+-----+-------+
|User|Order|   Food|
+----+-----+-------+
|   A|    1|   Eggs|
|   B|    1|  Salad|
|   A|    2|Peaches|
|   C|    1|  Bread|
|   A|    3|  Water|
+----+-----+-------+

  1. Create window and apply a udf to join the strings:
w = Window.partitionBy('User').orderBy('Order').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

@pandas_udf(StringType(), PandasUDFType.GROUPED_AGG)
def _udf(v):
    return ' $ '.join(v)

df = df.withColumn('Food List', _udf(df['Food']).over(w)).dropDuplicates(['User', 'Food List']).drop(*['Order', 'Food'])
df.show(truncate=False)

+----+----------------------+
|User|Food List             |
+----+----------------------+
|B   |Salad                 |
|C   |Bread                 |
|A   |Eggs $ Peaches $ Water|
+----+----------------------+

niuer
  • 1,589
  • 2
  • 11
  • 14