1

Is it possible to add a new column based on the maximum of previous columns where the previous columns are string literals. Consider following dataframe:

df = spark.createDataFrame(
    [
        ('1',25000,"black","black","white"),
        ('2',16000,"red","black","white"),
    ],
    ['ID','cash','colour_body','colour_head','colour_foot']
)

Then the target frame should look like this:

df = spark.createDataFrame(
    [
        ('1',25000,"black","black","white", "black" ),
        ('2',16000,"red","black","white", "white" ),
    ],
    ['ID','cash','colour_body','colour_head','colour_foot', 'max_v']
)

If there is no maximum detectable, then the last valid colour should be used.

Is there some kind of counter possibility available or udf?

pault
  • 41,343
  • 15
  • 107
  • 149
EchoCache
  • 555
  • 2
  • 8
  • 22

2 Answers2

1

Define a UDF around statistics.mode to compute the row-wise mode with the required semantics:

import statistics

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def mode(*x):
    try:
        return statistics.mode(x)
    except statistics.StatisticsError:
        return x[-1]

mode = udf(mode, StringType())

df.withColumn("max_v", mode(*[col(c) for c in df.columns if 'colour' in c])).show()

+---+-----+-----------+-----------+-----------+-----+
| ID| cash|colour_body|colour_head|colour_foot|max_v|
+---+-----+-----------+-----------+-----------+-----+
|  1|25000|      black|      black|      white|black|
|  2|16000|        red|      black|      white|white|
+---+-----+-----------+-----------+-----------+-----+
cs95
  • 379,657
  • 97
  • 704
  • 746
  • 1
    This is a case where using a `udf` is better than the API functions. – pault Jun 04 '19 at 16:51
  • @pault I did my due diligence in research and read about 30 links but could not find any useful info on row-wise mode. PySpark is severely limiting after being used to pandas and simple things like `mode(axis=1)` :P – cs95 Jun 04 '19 at 16:53
  • It *can* be done-- it would require an `explode` (or maybe a `posexplode`) followed by a `count` and then a `rank` using two separate `Window` functions. – pault Jun 04 '19 at 16:56
  • 1
    This is coming from someone who [takes it as a challenge](https://stackoverflow.com/questions/53090003/split-string-in-a-spark-dataframe-column-by-regular-expressions-capturing-groups/53090509#comment93077913_53090272) to find solutions using the API functions... – pault Jun 04 '19 at 17:02
1

For the general case of any number of columns, the udf solution by @cs95 is the way to go.

However, in this specific case where you have only 3 columns you can actually simplify the logic using just pyspark.sql.functions.when, which will be more efficient than using a udf.

from pyspark.sql.functions import col, when

def mode_of_3_cols(body, head, foot):
    return(
        when(
            (body == head)|(body == foot), 
            body
        ).when(
            (head == foot),
            head
        ).otherwise(foot)
    )

df.withColumn(
    "max_v", 
    mode_of_3_cols(col("colour_body"), col("colour_head"), col("colour_foot"))
).show()
#+---+-----+-----------+-----------+-----------+-----+
#| ID| cash|colour_body|colour_head|colour_foot|max_v|
#+---+-----+-----------+-----------+-----------+-----+
#|  1|25000|      black|      black|      white|black|
#|  2|16000|        red|      black|      white|white|
#+---+-----+-----------+-----------+-----------+-----+

You just need to check if any two columns are equal- if yes, then that value has to be the mode. If not, return the last column.

pault
  • 41,343
  • 15
  • 107
  • 149
  • Actually you can simplify the logic to remove the second `when` condition because you will return `foot` in either case, but I will leave it in case someone wants to return a different default value. – pault Jun 06 '19 at 16:30