59

I'm trying to use Spark dataframes instead of RDDs since they appear to be more high-level than RDDs and tend to produce more readable code.

In a 14-nodes Google Dataproc cluster, I have about 6 millions names that are translated to ids by two different systems: sa and sb. Each Row contains name, id_sa and id_sb. My goal is to produce a mapping from id_sa to id_sb such that for each id_sa, the corresponding id_sb is the most frequent id among all names attached to id_sa.

Let's try to clarify with an example. If I have the following rows:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

My goal is to produce a mapping from a1 to b2. Indeed, the names associated to a1 are n1, n2 and n3, which map respectively to b1, b2 and b2, so b2 is the most frequent mapping in the names associated to a1. In the same way, a2 will be mapped to b2. It's OK to assume that there will always be a winner: no need to break ties.

I was hoping that I could use groupBy(df.id_sa) on my dataframe, but I don't know what to do next. I was hoping for an aggregation that could produce, in the end, the following rows:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

But maybe I'm trying to use the wrong tool and I should just go back to using RDDs.

mazaneicha
  • 8,794
  • 4
  • 33
  • 52
Quentin Pradet
  • 4,691
  • 2
  • 29
  • 41

3 Answers3

82

Using join (it will result in more than one row in group in case of ties):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

Using window functions (will drop ties):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

Using struct ordering:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

See also How to select the first row of each group?

Florian
  • 24,425
  • 4
  • 49
  • 80
zero323
  • 322,348
  • 103
  • 959
  • 935
  • You mind clarifying a little more how `.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))` works? Does it resolve using something like `row_number().over(struct(col("cnt"), col("id_sb")))`? – R7L208 Sep 03 '21 at 15:53
11

I think what you might be looking for are window functions: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Here is an example in Scala (I don't have a Spark Shell with Hive available right now, so I was not able to test the code, but I think it should work):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

There are probably more efficient ways to achieve the same results with Window functions, but I hope this points you in the right direction.

alghimo
  • 2,899
  • 18
  • 11
0

in spark 3.2+:

dd1=df1.pandas_api()
col1=dd1.groupby("id_sa")['id_sb'].transform(lambda ss:ss.count())
dd1['col1']=col1
dd1.groupby("id_sa").apply(lambda dd:dd.sort_values("col1",ascending=False).head(1)).reset_index(drop=True).drop("col1",axis=1)
G.G
  • 639
  • 1
  • 5
  • Please read [answer] and [edit] your answer to contain an explanation as to why this code would actually solve the problem at hand. Always remember that you're not only solving the problem, but are also educating the OP and any future readers of this post. – Adriaan Mar 13 '23 at 09:53