2

I have a dataframe like below and I want to add a column 'ratings_list' that groups by id and puts the ratings into a list where the list index is the item number

id | item | rating
1  | 1    | 5
1  | 2    | 4
1  | 4    | 5
1  | 7    | 3
2  | 5    | 3
2  | 2    | 5
2  | 3    | 5

would ideally result in

id | rating_list
1  | [5,4,0,5,0,0,3]
2  | [0,5,5,0,3,0,0]

where the length of the rating_list is the number of distinct items in the data frame. So far I have a dataframe with a list of items and a list of ratings, but I'm not sure if this is even the appropriate intermediate step

id | item_list | rating_list
1  | [1,2,4,7] | [5,4,5,3]
2  | [2,3,5]   | [5,5,3]

This will be a huge dataframe so I prefer things that are quicker.

user13591820
  • 145
  • 1
  • 10

3 Answers3

1

Try this for Spark2.4+

Using window partitioned by literal will allow us to keep our partitions loaded and compute max/min without doing a collect operation.

df.show() #sampledataframe

#+---+----+------+
#|id |item|rating|
#+---+----+------+
#|1  |1   |5     |
#|1  |2   |4     |
#|1  |4   |5     |
#|1  |7   |3     |
#|2  |5   |3     |
#|2  |2   |5     |
#|2  |3   |5     |
#+---+----+------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy(F.lit(0))

df.withColumn("items", F.sequence(F.lit(1),F.max("item").over(w),F.lit(1)))\
  .groupBy("id").agg(F.collect_list("item").alias("item"),F.collect_list("rating").alias("rating"),\
                     F.first("items").alias("items"))\
  .withColumn("rating",\
              F.sort_array(F.arrays_zip(F.flatten(F.array("item",F.array_except("items","item"))),"rating")))\
  .select("id",F.expr("""transform(rating.rating,x-> IF(x is null, 0,x))""").alias("rating_list")).show(truncate=False)

#+---+---------------------+
#|id |rating_list          |
#+---+---------------------+
#|1  |[5, 4, 0, 5, 0, 0, 3]|
#|2  |[0, 5, 5, 0, 3, 0, 0]|
#+---+---------------------+
murtihash
  • 8,030
  • 1
  • 14
  • 26
  • 1
    @anky I guess that would be a more pandas way of going about it – murtihash Jun 11 '20 at 18:27
  • @murtihash isn't this creating one only partition with `Window().partitionBy(F.lit(0))`? I think OP described a huge dataset which makes it inefficient to bring all the data in one partition – abiratsis Jun 12 '20 at 08:14
  • 1
    @abiratsis yes i had that in mind that OP has a huge dataset, and you can do a .rdd.getNumPartitions() on my code to see that it maintains all partitions/keeps all of them loaded(200 default for me).. this is just an alternative I use sometimes instead of doing a collect() operation(not a huge fan of collecting to driver unless I absolutely have to). – murtihash Jun 12 '20 at 11:28
  • 1
    Hello again @murtihash I agree about `partition number == 200` but what I mean is that when you project data in window it will be repartitioned based on the window specification aka 1 partition in this case. `Window().partitionBy(...)` produces 200 partitions by default as well, although the functions being used i.e last, max etc will all be called in one only partition which is the whole dataset – abiratsis Jun 12 '20 at 12:34
  • 1
    @abiratsis True I do see your point, n i would suggest to OP to try both with window and with collect and see which works best for him. thanks for the point out, cheers – murtihash Jun 12 '20 at 12:48
1

You can do this with a udf.

from pyspark.sql.types import ArrayType,IntegerType
from pyspark.sql.functions import collect_list,col,create_map,udf,countDistinct,lit

#UDF
def get_rating_list(ratings_arr,num_items):
    ratings_list = [0]*num_items
    for map_elem in ratings_arr:
        for k,v in map_elem.items():
            ratings_list[k-1] = v
    return ratings_list

#1.Create a new map column with item as key and rating as value
t1 = df.withColumn('item_rating_map',create_map(col('item'),col('rating')))
#2.Group the dataframe on id and get all the maps per id into an array
grouped_df = t1.groupBy('id').agg(collect_list('item_rating_map').alias('item_ratings'))
#3.udf object
rating_list_udf = udf(get_rating_list,ArrayType(IntegerType()))
#4.Get the number of unique items
num_items = df.agg(countDistinct('item').alias('num_items')).collect()[0].num_items
#5.Apply the udf
result = grouped_df.withColumn('rating_arr',rating_list_udf(col('item_ratings'),lit(num_items)))
#result.show(20,truncate=False)

You might want to add additional logic in the udf to handle cases where there are n unique items but there is an item(s) with value > n, in which case you will get an IndexError.

Vamsi Prabhala
  • 48,685
  • 4
  • 36
  • 58
1

Here is another solution based on observation that max(item) == max_array_length, please let me know if the assumption is invalid.

from pyspark.sql.functions import expr, collect_list, min, max, sequence, lit

# max item implies max array length
maxi = df.select(max("item").alias("maxi")).first()["maxi"]

df = df.groupBy("id").agg( \
      collect_list("item").alias("items"),
      collect_list("rating").alias("ratings")
).withColumn("idx", sequence(lit(1), lit(maxi)))

# we are projecting an array[K] into array[N] where K <= N 
rating_expr = expr("""transform(idx, i -> if(array_position(items, i) >= 1, 
                                                 ratings[array_position(items, i) - 1], 
                                                 0))""")

df.select(df.id, rating_expr.alias("rating_list"))

# +---+---------------------+
# |id |rating_list          |
# +---+---------------------+
# |1  |[5, 4, 0, 5, 0, 0, 3]|
# |2  |[0, 5, 5, 0, 3, 0, 0]|
# +---+---------------------+

Analysis: iterate on idx, if current item, namely i, exists in items use its position to retrieve item from ratings with ratings[array_position(items, i) - 1], else 0.

abiratsis
  • 7,051
  • 3
  • 28
  • 46