47

I'm using PySpark and I have a Spark dataframe with a bunch of numeric columns. I want to add a column that is the sum of all the other columns.

Suppose my dataframe had columns "a", "b", and "c". I know I can do this:

df.withColumn('total_col', df.a + df.b + df.c)

The problem is that I don't want to type out each column individually and add them, especially if I have a lot of columns. I want to be able to do this automatically or by specifying a list of column names that I want to add. Is there another way to do this?

Paul
  • 26,170
  • 12
  • 85
  • 119
plam
  • 1,305
  • 3
  • 15
  • 24
  • This is much easier with RDDs than dataframes e.g. if data is an array representing a row, then you can do `RDD.map(lambda data: (data, sum(data)))`. The main reason this is more difficult with a spark dataframe is figuring out what is allowed as a column expression in `withColumn`. It doesn't seem to be very well documented. – Paul Aug 12 '15 at 03:36
  • This doesn't seem to work either (PySpark 1.6.3): `dftest.withColumn("times", sum((dftest[c] > 2).cast("int") for c in dftest.columns[1:]))` and then, `dftest.select('a', 'b', 'c', 'd').rdd.map(lambda x: (x, sum(x))).take(2)` Does not seem to work – Abhinav Sood Sep 04 '17 at 01:43

8 Answers8

57

This was not obvious. I see no row-based sum of the columns defined in the spark Dataframes API.

Version 2

This can be done in a fairly simple way:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns is supplied by pyspark as a list of strings giving all of the column names in the Spark Dataframe. For a different sum, you can supply any other list of column names instead.

I did not try this as my first solution because I wasn't certain how it would behave. But it works.

Version 1

This is overly complicated, but works as well.

You can do this:

  1. use df.columns to get a list of the names of the columns
  2. use that names list to make a list of the columns
  3. pass that list to something that will invoke the column's overloaded add function in a fold-type functional manner

With python's reduce, some knowledge of how operator overloading works, and the pyspark code for columns here that becomes:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

Note this is a python reduce, not a spark RDD reduce, and the parenthesis term in the second parameter to reduce requires the parenthesis because it is a list generator expression.

Tested, Works!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
Community
  • 1
  • 1
Paul
  • 26,170
  • 12
  • 85
  • 119
  • @Salmonerd Thanks. It helps sometimes to remember the spark dataframe class is immutable, and so to make any changes in the data you have to call something that returns a new dataframe. – Paul Aug 13 '15 at 03:58
  • 5
    Version 2 is not working with Spark 1.5.0 and CDH-5.5.2. and Python version 3.4. It is throwing an error : "AttributeError: 'generator' object has no attribute '_get_object_id" – Hemant May 31 '16 at 09:08
  • Both of your solutions are nice and neat. I am wondering why you did not use user defined functions for this? – FairyOnIce Nov 29 '16 at 17:43
  • 1
    @Paul I used the VERSION 2 method to add up several columns with "bigint" type. Somehow, I got this error: 'generator' object has no attribute '_get_object_id'. Do you know why this would happen? Thank you! – Elsa Li May 03 '18 at 23:04
  • @QianLi No idea. – Paul May 03 '18 at 23:07
  • @QianLi try writing up a reproducible example and post a new question – Paul May 03 '18 at 23:09
  • @QianLi were you able to solve the `error: 'generator' object has no attribute '_get_object_id'` error? – Rahul Chawla May 16 '18 at 12:06
  • @RahulChawla Unfortunately, this error problem not solved yet. Do you have any suggestions? – Elsa Li May 27 '18 at 15:41
  • @QianLi still looking for it. – Rahul Chawla Jun 05 '18 at 08:37
  • 5
    Version 2 doesn't work. Throws a `TypeError: 'Column' object is not callable` – Augmented Jacob Sep 24 '18 at 19:42
  • 3
    Version 1 doesn't work for me, get an error "Column is not iterable" – Vincent Chalmel Sep 25 '18 at 15:53
  • I suggest posting a new question if you are having trouble. Obviously there have been newer versions of spark since this was posted in 2015, back when it did solve the OPs problem. – Paul Sep 26 '18 at 23:01
  • @VincentChalmel No errors in version 1, after iniitalizing `pyspark.SparkContext` and `pyspark.sql.SparkSession` to make sure `.toDF()` is available on RDD and also importing `reduce` from `functools`, This is with pyspark 2.3.1. To duplicate my jupyter+spark environment, use this docker command: `docker run -it -p 8888:8888 jupyter/all-spark-notebook:latest` – Paul Oct 01 '18 at 10:20
  • @AugmentedJacob No idea why your'e getting that. Confirmed still works for me in 2.3.1. See comment above for docker command to duplicate environment. – Paul Oct 01 '18 at 10:21
  • @Paul: Mine throws the error `AssertionError: col should be Column` (Spark 2.2) – absolutelydevastated Oct 04 '18 at 01:59
  • I'm having no problem using this solution's Version 2. `sum` and garden variety plus (`+`) operator work fine. I'm using Spark 2.4 – Wassadamo Jan 05 '23 at 01:08
21

The most straight forward way of doing it is to use the expr function

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))
Jonathan
  • 782
  • 7
  • 10
19

The solution

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

posted by @Paul works. Nevertheless I was getting the error, as many other as I have seen,

TypeError: 'Column' object is not callable

After some time I found the problem (at least in my case). The problem is that I previously imported some pyspark functions with the line

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

so the line imported the sum pyspark command while df.withColumn('total', sum(df[col] for col in df.columns)) is supposed to use the normal python sum function.

You can delete the reference of the pyspark function with del sum.

Otherwise in my case I changed the import to

import pyspark.sql.functions as F

and then referenced the functions as F.sum.

roschach
  • 8,390
  • 14
  • 74
  • 124
13

Summing multiple columns from a list into one column

PySpark's sum function doesn't support column addition. This can be achieved using expr function.

from pyspark.sql.functions import expr

cols_list = ['a', 'b', 'c']

# Creating an addition expression using `join`
expression = '+'.join(cols_list)

df = df.withColumn('sum_cols', expr(expression))

This gives us the desired sum of columns.

Vivek Payasi
  • 569
  • 5
  • 9
1

My problem was similar to the above (bit more complex) as i had to add consecutive column sums as new columns in PySpark dataframe. This approach uses code from Paul's Version 1 above:

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

add new columns that are cumulative sums (consecutive):

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

'cumulative sum' columns added are as follows:

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
Grant Shannon
  • 4,709
  • 1
  • 46
  • 36
1
df = spark.createDataFrame([("linha1", "valor1", 2), ("linha2", "valor2", 5)], ("Columna1", "Columna2", "Columna3"))

df.show()

+--------+--------+--------+
|Columna1|Columna2|Columna3|
+--------+--------+--------+
|  linha1|  valor1|       2|
|  linha2|  valor2|       5|
+--------+--------+--------+

df = df.withColumn('DivisaoPorDois', df[2]/2)
df.show()

+--------+--------+--------+--------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|
+--------+--------+--------+--------------+
|  linha1|  valor1|       2|           1.0|
|  linha2|  valor2|       5|           2.5|
+--------+--------+--------+--------------+

df = df.withColumn('Soma_Colunas', df[2]+df[3])
df.show()

+--------+--------+--------+--------------+------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|Soma_Colunas|
+--------+--------+--------+--------------+------------+
|  linha1|  valor1|       2|           1.0|         3.0|
|  linha2|  valor2|       5|           2.5|         7.5|
+--------+--------+--------+--------------+------------+
0

A very simple approach would be to just use select instead of withcolumn as below:

df = df.select('*', (col("a")+col("b")+col('c).alias("total"))

This should give you required sum with minor changes based on requirements

0

The following approach works for me:

  1. Import pyspark sql functions
    from pyspark.sql import functions as F
  2. Use F.expr(list_of_columns)
    data_frame.withColumn('Total_Sum',F.expr('col_name1+col_name2+..col_namen)
Manjunatha H C
  • 217
  • 2
  • 3