1

I have a data set where I need to get some counts and also the most frequent values.

An example data frame is below.

from pyspark.sql import Row, functions as F    
row = Row("UK_1","UK_2","Request_Date",'Approval_Date',"Cat",'Country','State' )
test_df = (sc.parallelize
([
    row(1,1,'12/10/2016','10-10-2017',"A",'USA','NY'),
    row(1,2,None,'10-1-2016','A','CAN','QC'),
    row(2,1,'14/10/2016',None,'B','USA','FL'),
    row('A',3,'!~2016/2/276','Bad Date','B','USA',None),
    row(None,1,'26/09/2016','26/11/08','A',None,'ON'),
    row(1,1,'12/10/2016','22-02-20','A',None,None),
    row(1,2,None,'45/45/00','A','MEX','XPZ'),
    row(2,1,'14/10/2016','None','B','DEU','DUS'),
    row(None,None,'!~2016/2/276','12-01-2015','B','',''),
    row(None,1,'26/09/2016',None,'A','USA','CA')
]).toDF())
test_df.show()

enter image description here

I have sample code but not complete.

(
    test_df
    .agg
    (
        F.count('*').alias('count'),
        F.countDistinct('Country').alias('distinct_country')
        #.alias('top_2_countries')
    )
    .show()
)

The results expected is like below.

enter image description here

How to get this done.

pault
  • 41,343
  • 15
  • 107
  • 149
Tronald Dump
  • 1,300
  • 3
  • 16
  • 27

1 Answers1

0

The null values in your DataFrame are causing problems for aggregating. One option is to replace those values with something not-null for the purposes of aggregation.

For instance:

new_df = test_df.withColumn(
    "Country",
    F.when(
        F.isnull("Country"),
        "None"
    ).otherwise(F.col("Country"))
)

This returns a DataFrame where the null values of the Country column have been replaced with the string "None". (I purposely avoided using the string "null" to avoid ambiguity.)

Now you can get the counts and rank each of the Countries by their frequencies using pyspark.sql.functions.rank() and a pyspark.sql.Window.

from pyspark.sql import Window
new_df.groupBy("Country")\
    .agg(
        F.count("Country").alias("Count"),
        F.rank().over(Window.orderBy(F.count("Country").desc())).alias("Rank")
    )\
    .show()
#+-------+-----+----+
#|Country|Count|Rank|
#+-------+-----+----+
#|    USA|    4|   1|
#|   None|    2|   2|
#|    MEX|    1|   3|
#|       |    1|   3|
#|    DEU|    1|   3|
#|    CAN|    1|   3|
#+-------+-----+----+

As you can see, "None" shows up in the County column because of the replacement. At this point, you have everything you need to compute the desired aggregations.

  • The first output column (count) is simply the sum of the Count column.
  • The second output column (distinct_country) is computed similarly to how you were doing so in your post.
  • The final output column (top_2_countries) can be computed by using pyspark.sql.functions.collect_list(), filtering for values where the rank <= 2.

For example:

new_df.groupBy("Country")\
    .agg(
        F.count("Country").alias("Count"),
        F.rank().over(Window.orderBy(F.count("Country").desc())).alias("Rank")
    )\
    .agg(
        F.sum("Count").alias("count"),
        F.countDistinct("Country").alias("distinct_country"),
        F.collect_list(F.when(F.col("rank")<=2, F.col("Country"))).alias("top_2_countries")
    )\
    .show()
#+-----+----------------+---------------+
#|count|distinct_country|top_2_countries|
#+-----+----------------+---------------+
#|   10|               6|    [USA, None]|
#+-----+----------------+---------------+

Note two things here. Firstly the count is 6 as opposed to 5 in your example. The 5 is a result of nulls being ignored in countDistinct(). Similarly, the top_2_countries column has the values [USA, None].

For demostration purposes, here is what would have happened if you had converted the "None" values back into null:

new_df.groupBy("Country")\
    .agg(
        F.count("Country").alias("Count"),
        F.rank().over(Window.orderBy(F.count("Country").desc())).alias("Rank")
    )\
    .withColumn(
        "Country",
        F.when(F.col("Country") == "None", None).otherwise(F.col("Country"))
    )\
    .agg(
        F.sum("Count").alias("count"),
        F.countDistinct("Country").alias("distinct_country"),
        F.collect_list(F.when(F.col("rank")<=2, F.col("Country"))).alias("top_2_countries")
    )\
    .show()
#+-----+----------------+---------------+
#|count|distinct_country|top_2_countries|
#+-----+----------------+---------------+
#|   10|               5|          [USA]|
#+-----+----------------+---------------+

As you can see the distinct count is 5, but the top_2_countries column does not contain null. This is because null is excluded from collect_list()1. (See this example).

1It is important to note that I have exploited this fact in calling collect_list(). From the docs for pyspark.sql.functions.when():

If Column.otherwise() is not invoked, None is returned for unmatched conditions.

pault
  • 41,343
  • 15
  • 107
  • 149