2

I have a pyspark dataframe with two columns, ID and Elements. Column "Elements" has list element in it. It looks like this,

ID | Elements
_______________________________________
X  |[Element5, Element1, Element5]
Y  |[Element Unknown, Element Unknown, Element_Z]

I want to form a column with the most frequent element in the column 'Elements.' Output should look like,

ID | Elements                                           | Output_column 
__________________________________________________________________________
X  |[Element5, Element1, Element5]                      | Element5
Y  |[Element Unknown, Element Unknown, Element_Z]       | Element Unknown 

How can I do that using pyspark?

Thanks in advance.

Droid-Bird
  • 1,417
  • 5
  • 19
  • 43

2 Answers2

6

We can use higher order functions here (available from spark 2.4+)

  1. First use transform and aggregate to get counts for each distinct value in the array.
  2. Then sort the array of structs in descending manner and then get the first element.

from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
              .withColumn("Counts",F.expr("""transform(Dist,x->
                           aggregate(Elements,0,(acc,y)-> IF (y=x, acc+1,acc))
                                      )"""))
              .withColumn("Map",F.arrays_zip("Dist","Counts")
              )).drop("Dist","Counts")
out = temp.withColumn("Output_column",
                    F.expr("""element_at(array_sort(Map,(first,second)->
         CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))

Output:

Note that I have added a blank array for ID z to test. Also you can drop the column Map by adding .drop("Map") to the output

out.show(truncate=False)

+---+---------------------------------------------+--------------------------------------+---------------+
|ID |Elements                                     |Map                                   |Output_column  |
+---+---------------------------------------------+--------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |[{Element5, 2}, {Element1, 1}]        |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z  |[]                                           |[]                                    |null           |
+---+---------------------------------------------+--------------------------------------+---------------+

For lower versions, you can use a udf with statistics mode:

from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())

df.withColumn("Output",u("Elements")).show(truncate=False)
+---+---------------------------------------------+---------------+
|ID |Elements                                     |Output         |
+---+---------------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z  |[]                                           |null           |
+---+---------------------------------------------+---------------+
anky
  • 74,114
  • 11
  • 41
  • 70
0

You can use pyspark sql functions to achieve that (spark 2.4+). Here is a generic function that adds a new column containing the most common element in another array column. Here it is:

import pyspark.sql.functions as sf

def add_most_common_val_in_array(df, arraycol, drop=False):
    """Takes a spark df column of ArrayType() and returns the most common element
    in the array in a new column of the df called f"MostCommon_{arraycol}"

    Args:
        df (spark.DataFrame): dataframe
        arraycol (ArrayType()): array column in which you want to find the most common element
        drop (bool, optional): Drop the arraycol after finding most common element. Defaults to False.

    Returns:
        spark.DataFrame: df with additional column containing most common element in arraycol
    """
    dvals = f"distinct_{arraycol}"
    dvalscount = f"distinct_{arraycol}_count"
    startcols = df.columns
    df = df.withColumn(dvals, sf.array_distinct(arraycol))
    df = df.withColumn(
        dvalscount,
        sf.transform(
            dvals,
            lambda uval: sf.aggregate(
                arraycol,
                sf.lit(0),
                lambda acc, entry: sf.when(entry == uval, acc + 1).otherwise(acc),
            ),
        ),
    )
    countercol = f"ReverseCounter{arraycol}"
    df = df.withColumn(countercol, sf.map_from_arrays(dvalscount, dvals))
    mccol = f"MostCommon_{arraycol}"
    df = df.withColumn(mccol, sf.element_at(countercol, sf.array_max(dvalscount)))
    df = df.select(*startcols, mccol)
    if drop:
        df = df.drop(arraycol)
    return df
mik1904
  • 1,335
  • 9
  • 18