5

I have this PySpark DataFrame

df = pd.DataFrame(np.array([
    ["aa@gmail.com",2,3], ["aa@gmail.com",5,5],
    ["bb@gmail.com",8,2], ["cc@gmail.com",9,3]
]), columns=['user','movie','rating'])

sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)
         user movie rating
aa@gmail.com     2      3
aa@gmail.com     5      5
bb@gmail.com     8      2
cc@gmail.com     9      3

I need to add a new column with a Rank by User

I want have this output

         user  movie rating  Rank
aa@gmail.com     2      3     1
aa@gmail.com     5      5     1
bb@gmail.com     8      2     2
cc@gmail.com     9      3     3

How can I do that?

ZygD
  • 22,092
  • 39
  • 79
  • 102
Kardu
  • 865
  • 3
  • 13
  • 24

1 Answers1

12

There is really no elegant solution here as for now. If you have to you can try something like this:

lookup = (sparkdf.select("user")
    .distinct()
    .orderBy("user")
    .rdd
    .zipWithIndex()
    .map(lambda x: x[0] + (x[1], ))
    .toDF(["user", "rank"]))

sparkdf.join(lookup, ["user"]).withColumn("rank", col("rank") + 1)

Window functions alternative is much more concise:

from pyspark.sql.functions import dense_rank

sparkdf.withColumn("rank", dense_rank().over(w))

but it is extremely inefficient and should be avoided in practice.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Is there any references to why window functions should be avoided in practice? – Oleksiy Apr 15 '16 at 17:31
  • @Oleksiy Rule of thumb -> No `partitionBy` clause or low cardinality of the partition key. I haven't checked how recent changes on master (2.0.0+) affect that, but I am quite sure that not much. – zero323 Apr 15 '16 at 17:35
  • @zero323 -thank you for the answer. Could you explain why it is inefficient and what three lines below do: .zipWithIndex() .map(lambda x: x[0] + (x[1], )) .toDF(["user", "rank"])) – BI Dude Apr 22 '20 at 16:02