1

I need to find the sum of row values for around 900 column I applied the function in this link Spark - Sum of row values

from functools import reduce

def superSum(*cols):
   return reduce(lambda a, b: a + b, cols)

add = udf(superSum)

df.withColumn('total', add(*[df[x] for x in df.columns])).show()

but i got this error

Py4JJavaError: An error occurred while calling o1005.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "***********\pyspark\worker.py", line 218, in main
  File "***********\pyspark\worker.py", line 147, in read_udfs
  File "<string>", line 1
SyntaxError: more than 255 arguments
Ahmad Senousi
  • 613
  • 2
  • 12
  • 24
  • 1
    Possible duplicate of [Pass more than 255 arguments to a function](https://stackoverflow.com/questions/26766076/pass-more-than-255-arguments-to-a-function) – pault Jul 06 '18 at 13:47

1 Answers1

2

I give same error superSum functions but code below is worked, I guess udf functions not working with more than 255 arguments. python3

import operator
from functools import reduce
import findspark
findspark.init() # replace with your spark path
from pyspark import SparkConf, SparkContext

from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import Row

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)


df = sqlContext.createDataFrame([
    Row(**{str(i):0 for i in range(300)})
])

df \
    .withColumn('total', reduce(operator.add, map(F.col, df.columns))).show()
hamza tuna
  • 1,467
  • 1
  • 12
  • 17
  • @ hamza tuna after apply your code and save the resulting DF I obtained this error '--------------------------------------------------------------------------- IndexError Traceback (most recent call last) ********\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in _get_connection(self) 851 try: --> 852 connection = self.deque.pop() 853 except IndexError: IndexError: pop from an empty deque Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:54179) ' – Ahmad Senousi Jul 06 '18 at 08:54
  • I did that and restart my PC and this action still appear and also get the following message 'ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it' – Ahmad Senousi Jul 06 '18 at 09:09
  • pyspark --num-executors 20 --executor-memory 12G --executor-cores 5 --driver-memory 16G --driver-cores 3 – Ahmad Senousi Jul 06 '18 at 09:26
  • updated answer. I put all code and its working correctly. Are you sure to you have running cluster. You checked your spark web ui? You really have 20 executors with 12 G ram. This means you have least 20 computer with 16 G ram – hamza tuna Jul 06 '18 at 09:30
  • I created spark session as follow 'SparkSession.builder\ .master("local")\ .appName("my_app")\ .config("spark.some.config.option", "some-value")\ .getOrCreate() – Ahmad Senousi Jul 06 '18 at 09:33
  • concerning the pyspark configuration. I work on a server and I faced some proplem like OOM error and after wrote this i didn't counter OOM yet – Ahmad Senousi Jul 06 '18 at 09:37
  • Can you try without --num-executors 20 --executor-memory 12G --executor-cores 5 --driver-memory 16G --driver-cores 3 – this – hamza tuna Jul 06 '18 at 09:37