-1

I am trying to apply aggregated function to columns in pyspark. The context is I have only Spark 2.2 at hand and no option of using vectorized pandas_udf

sdt = spark.createDataFrame(zip([random.randint(1,100) for x in range(20)], [random.randint(1,100) for x in range(20)]), schema=['col1', 'col2'])
+----+----+
|col1|col2|
+----+----+
|  19|  51|
|  95|  56|
|  11|  94|
|  80|  99|
|  20|  80|
|  38|  91|
|  18|  88|
|   4|  33|
+----+----+

In order to parallelize the columns, I convert it into rdd

sdt_col_rdd = sc.parallelize(sdt.columns)

Testing works fine with ordinary python functions, returning panda's dataframe

x = sdt_col_rdd.map(lambda col : (col, pd.DataFrame(np.random.randint(0,100,size=(2, 4)), columns=list('ABCD'))))
y = x.collectAsMap() #collect into dictionary with column names as key
print(y['col1']; print(y['col2']);
    A   B   C   D
0  14  55   4  57
1  36  84  53  51
    A   B   C   D
0  14  55   4  57
1  36  84  53  51

Switching into spark dataframe, here's also a sample function returning panda's df, but process Spark's df and use it's native aggregations, transformation, actions, etc. :

def myFunc(df, c):
    #other more processing, aggregation, transformation may be performed here
    res = df.agg((F.min(c)-1).alias("min_"+c), (F.max(c)+1).alias("max_"+c)).toPandas()
    res["col_name"] = c
    return res

Function works fine on its own

myFunc(sdt.select('col1'), 'col1')
    min_col1    max_col1    col_name
0   4   100 col1

Issues arise when I put that inside rdd map, similar to what has been done above

x= sdt_col_rdd.map(lambda col: (col,myFunc(sdt.select(col), col)))
y = x.collectAsMap()

Any idea how to achieve this kind of transformations/actions in Spark for columns in parallel, without udaf ? Collect_list would not be efficient because of huge dataset and without exploiting Spark's functions.

During handling of the above exception, another exception occurred:

PicklingError Traceback (most recent call last) in () 1 col_map = sdt_col_rdd.map(lambda col: (col,myFunc(sdt.select(col), col))) ----> 2 y = col_map.collectAsMap()

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in collectAsMap(self) 1555 4 1556 """ -> 1557 return dict(self.collect()) 1558 1559 def keys(self):

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in collect(self) 794 """ 795 with SCCallSiteSync(self.context) as css: --> 796 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 797 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 798

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in _jrdd(self) 2440 2441 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer, -> 2442 self._jrdd_deserializer, profiler) 2443 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, 2444
self.preservesPartitioning)

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2373 assert serializer, "serializer should not be empty" 2374
command = (func, profiler, deserializer, serializer) -> 2375 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) 2376 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, 2377 sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command) 2359 # the serialized command will be compressed by broadcast 2360 ser = CloudPickleSerializer() -> 2361 pickled_command = ser.dumps(command) 2362 if len(pickled_command) > (1 << 20): # 1M 2363 # The broadcast will have same life cycle as created PythonRDD

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/serializers.py in dumps(self, obj) 462 463 def dumps(self, obj): --> 464 return cloudpickle.dumps(obj, 2) 465 466

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/cloudpickle.py in dumps(obj, protocol) 702 703 cp = CloudPickler(file,protocol) --> 704 cp.dump(obj) 705 706 return file.getvalue()

/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/cloudpickle.py in dump(self, obj) 160 msg = "Could not serialize object: %s: %s" % (e.class.name, emsg) 161 print_exec(sys.stderr) --> 162 raise pickle.PicklingError(msg) 163 164 def save_memoryview(self, obj):

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o62.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

Kenny
  • 1,902
  • 6
  • 32
  • 61
  • As I mentioned, row-based UDF would not work in this case. We need UDAF, which is not available in Spark 2.2 and I am looking for alternatives, without using _collect_list_ because 1/ That will lose the Spark distributed power on that single column; we are tuning down to plain own Python list 2/collect_list will have trouble with millions of records – Kenny Apr 04 '19 at 02:11
  • 1
    Seems like you didn't catch this part of the accepted answer - "You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible)." – user10938362 Apr 04 '19 at 09:07

1 Answers1

-1

Seems you did not register your udf, import the udf function & register the udf as showed below, that should work.

from pyspark.sql.functions import *

myFunc=udf(myFunc,StringType())

Pabbati
  • 3,809
  • 2
  • 7
  • 3
  • As far as I understand, udf is for Spark dataframe, row-based. I am accessing as rdd, because it's column-based, and not data frame. Besides, udf is not for aggregate function I think. That would be UDAF which would have solved my problem; unfortunately I don;t have. Also, result is expected to be panda's dataframe. Would StringType() work ? – Kenny Apr 02 '19 at 18:21