Ultimately what I want is the mode of a column, for all the columns in the DataFrame. For other summary statistics, I see a couple of options: use DataFrame aggregation, or map the columns of the DataFrame to an RDD of vectors (something I'm also having trouble doing) and use colStats
from MLlib. But I don't see mode as an option there.
8 Answers
A problem with mode is pretty much the same as with median. While it is easy to compute, computation is rather expensive. It can be done either using sort followed by local and global aggregations or using just-another-wordcount and filter:
import numpy as np
np.random.seed(1)
df = sc.parallelize([
(int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])
cnts = df.groupBy("x").count()
mode = cnts.join(
cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0
Either way it may require a full shuffle for each column.

- 322,348
- 103
- 959
- 935
-
It gives me error : AttributeError: 'str' object has no attribute 'alias' – Neo May 05 '17 at 20:32
-
@Ajinkya This means you're using `builtins.max` not `pyspark.sql.functions.max`. – zero323 May 06 '17 at 08:50
-
can you please tell how to handle the case where mode of a column is null/missing value. Then we should take 2nd highest ocurring value. Thanks – Neo May 08 '17 at 21:00
-
@Ajinkya Perpend this with `na.drop("column_name")` – zero323 May 09 '17 at 06:36
This line will give you the mode of "col" in spark data frame df:
df.groupby("col").count().orderBy("count", ascending=False).first()[0]
For a list of modes for all columns in df use:
[df.groupby(i).count().orderBy("count", ascending=False).first()[0] for i in df.columns]
To add names to identify which mode for which column, make 2D list:
[[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]
The following method can help you to get mode of all columns of an input dataframe
from pyspark.sql.functions import monotonically_increasing_id
def get_mode(df):
column_lst = df.columns
res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
for i in range(1, len(res)):
df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
return df_mode.drop("temp_name_monotonically_increasing_id")

- 41
- 1
- 3
You can calculate column mode using Java code as follows:
case MODE:
Dataset<Row> cnts = ds.groupBy(column).count();
Dataset<Row> dsMode = cnts.join(
cnts.agg(functions.max("count").alias("max_")),
functions.col("count").equalTo(functions.col("max_")
));
Dataset<Row> mode = dsMode.limit(1).select(column);
replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
ds = replaceWithValue(ds, column, replaceValue);
break;
private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) {
return ds.withColumn(column,
functions.coalesce(functions.col(column), functions.lit(replaceValue)));
}

- 13,139
- 5
- 50
- 53

- 518
- 4
- 15
-
Your code example seems to start in the middle of a 'switch' block, and have peculiar indenting. Is there something at the beginning of your example? – rwp Mar 30 '18 at 09:26
-
>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
See My result
>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
| 3av_| 64|
| 7A3j| 509|
| g5HH| 1489|
| oT7d| 109|
| DM_V| 149|
| uKAI|44883|
+------+-----+
# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]
# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'
using groupBy() function getting count of each category in column. df is my result data frame has two columns var210,count. using orderBy() with column name 'count' in descending order give the max value in 1st row of data frame. collect()[0][0] is used to get the 1 tuple in data frame

- 1
- 2
First group by the column by count (I did it without counting null values), and get the maximal count value (the frequent value). Second, look for the key of the maximal count value:
from pysprak.sql import functions as F
count_mode_val = df.groupBy("column_name").count().filter(F.col("column_name").isNotNull()).agg(F.max("count")).collect()[0][0]
mode_val = df.groupBy("column_name").count().filter(F.col("column_name").isNotNull()).filter(F.col("count") == count_mode_val).select("column_name").collect()[0][0]

- 1
- 1
Use UDF as its simple and less complicated:-
It will work for both Categorical & Numeric dtypes.
from pyspark.sql.functions import col, udf, collect_list
import statistics
# define a UDF to calculate mode
def mode_udf(data):
if len(data) == 0:
return None
return statistics.mode(data) # similar for mean, median.
# register the UDF
mode_func = udf(mode_udf)
# create a sample dataframe
data = [("apple", 1), ("orange", 2), ("apple", 2), ("banana", 4), ("orange", 12), ("orange", 2), ("apple", 3), ("apple", 0), ("apple", 3),("apple", 2), ("apple", 2), ("banana", 7), ("banana", 4)]
df = spark.createDataFrame(data, ["fruit", "quantity"])
# calculate the mode for the "fruit" column
mode_df = df.groupBy("fruit").agg(mode_func(collect_list("quantity")).alias("quantity_mode"))
# show the result
mode_df.show()
Note:- Please handle None/Null values in your data, else there is a chance of getting unexpected outputs.

- 166
- 1
- 9
Spark 3.4+ has mode
:
F.mode(column)
Full example:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(2, 7),
(1, 8),
(1, 9),
(1, None),
(2, None)],
['c1', 'c2'])
df.agg(*[F.mode(c).alias(c) for c in df.columns]).show()
# +---+---+
# | c1| c2|
# +---+---+
# | 1| 7|
# +---+---+

- 22,092
- 39
- 79
- 102