0

My data consists of multiple columns and it looks something like this:

enter image description here

I would like to group the data for each column separately and count number of occurrences of each element, which I can achieve by doing this:

df.groupBy("Col-1").count() 

df.groupBy("Col-2").count()

df.groupBy("Col-n").count()

However, if there are 1000 of columns, it my be time consuming. So I was trying to find the another way to do it:

At the moment what I have done so far:

def mapFxn1(x):
    vals=[1] * len(x)
    c=tuple(zip(list(x), vals))
    return c

df_map=df.rdd.map(lambda x: mapFxn1(x))

mapFxn1 takes each row and transforms it into tuple of tuples: so basically row one would look like this: ((10, 1), (2, 1), (x, 1))

I am just wondering how one can used reduceByKey on df_map with the lambda x,y: x + y in order to achieve the grouping on each of columns and counting the occurrences of elements in each of the columns in single step.

Thank you in advance

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
user1275607
  • 99
  • 1
  • 10

1 Answers1

1

With cube:

df = spark.createDataFrame(
    [(3, 2), (2, 1), (3, 8), (3, 9), (4, 1)]
).toDF("col1", "col2")
df.createOrReplaceTempView("df")

spark.sql("""SELECT col1, col2, COUNT(*) 
             FROM df GROUP BY col1, col2 GROUPING SETS(col1, col2)"""
).show()

# +----+----+--------+
# |col1|col2|count(1)|
# +----+----+--------+
# |null|   9|       1|
# |   3|null|       3|
# |null|   1|       2|
# |null|   2|       1|
# |   2|null|       1|
# |null|   8|       1|
# |   4|null|       1|
# +----+----+--------+

With melt:

melt(df, [], df.columns).groupBy("variable", "value").count().show()

# +--------+-----+-----+
# |variable|value|count|
# +--------+-----+-----+
# |    col2|    8|    1|
# |    col1|    3|    3|
# |    col2|    2|    1|
# |    col1|    2|    1|
# |    col2|    9|    1|
# |    col1|    4|    1|
# |    col2|    1|    2|
# +--------+-----+-----+

With reduceByKey

from operator import add

counts = (df.rdd
   .flatMap(lambda x: x.asDict().items())
   .map(lambda x: (x, 1))
   .reduceByKey(add))

counts.toLocalIterator():
    print(x)
#     
# (('col1', 2), 1)
# (('col2', 8), 1)
# (('col2', 1), 2)
# (('col2', 9), 1)
# (('col1', 4), 1)
# (('col1', 3), 3)
# (('col2', 2), 1)
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Thank you very much for your quick reply. It seems to work. I was particularly interested in reduceByKey approach. Although it's working as expected. The performance is the issue. I have 50M rows and 60 columns. It took almost more than 10 minutes on my 8 core machine. However, if I do same task with python and numpy ( simply passing through the file 10K lines at time and applying the numpy hist function and then updating the counts) it took less than 3 minutes on single core. Is this normal behavior? – user1275607 Jan 28 '18 at 11:01