I am trying to apply aggregated function to columns in pyspark. The context is I have only Spark 2.2 at hand and no option of using vectorized pandas_udf
sdt = spark.createDataFrame(zip([random.randint(1,100) for x in range(20)], [random.randint(1,100) for x in range(20)]), schema=['col1', 'col2'])
+----+----+
|col1|col2|
+----+----+
| 19| 51|
| 95| 56|
| 11| 94|
| 80| 99|
| 20| 80|
| 38| 91|
| 18| 88|
| 4| 33|
+----+----+
In order to parallelize the columns, I convert it into rdd
sdt_col_rdd = sc.parallelize(sdt.columns)
Testing works fine with ordinary python functions, returning panda's dataframe
x = sdt_col_rdd.map(lambda col : (col, pd.DataFrame(np.random.randint(0,100,size=(2, 4)), columns=list('ABCD'))))
y = x.collectAsMap() #collect into dictionary with column names as key
print(y['col1']; print(y['col2']);
A B C D
0 14 55 4 57
1 36 84 53 51
A B C D
0 14 55 4 57
1 36 84 53 51
Switching into spark dataframe, here's also a sample function returning panda's df, but process Spark's df and use it's native aggregations, transformation, actions, etc. :
def myFunc(df, c):
#other more processing, aggregation, transformation may be performed here
res = df.agg((F.min(c)-1).alias("min_"+c), (F.max(c)+1).alias("max_"+c)).toPandas()
res["col_name"] = c
return res
Function works fine on its own
myFunc(sdt.select('col1'), 'col1')
min_col1 max_col1 col_name
0 4 100 col1
Issues arise when I put that inside rdd map, similar to what has been done above
x= sdt_col_rdd.map(lambda col: (col,myFunc(sdt.select(col), col)))
y = x.collectAsMap()
Any idea how to achieve this kind of transformations/actions in Spark for columns in parallel, without udaf ? Collect_list would not be efficient because of huge dataset and without exploiting Spark's functions.
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last) in () 1 col_map = sdt_col_rdd.map(lambda col: (col,myFunc(sdt.select(col), col))) ----> 2 y = col_map.collectAsMap()
/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in collectAsMap(self) 1555 4 1556 """ -> 1557 return dict(self.collect()) 1558 1559 def keys(self):
/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in collect(self) 794 """ 795 with SCCallSiteSync(self.context) as css: --> 796 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 797 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 798
/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in _jrdd(self) 2440 2441 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer, -> 2442 self._jrdd_deserializer, profiler) 2443 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, 2444
self.preservesPartitioning)/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2373 assert serializer, "serializer should not be empty" 2374
command = (func, profiler, deserializer, serializer) -> 2375 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) 2376 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, 2377 sc.pythonVer, broadcast_vars, sc._javaAccumulator)/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command) 2359 # the serialized command will be compressed by broadcast 2360 ser = CloudPickleSerializer() -> 2361 pickled_command = ser.dumps(command) 2362 if len(pickled_command) > (1 << 20): # 1M 2363 # The broadcast will have same life cycle as created PythonRDD
/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/serializers.py in dumps(self, obj) 462 463 def dumps(self, obj): --> 464 return cloudpickle.dumps(obj, 2) 465 466
/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/cloudpickle.py in dumps(obj, protocol) 702 703 cp = CloudPickler(file,protocol) --> 704 cp.dump(obj) 705 706 return file.getvalue()
/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/cloudpickle.py in dump(self, obj) 160 msg = "Could not serialize object: %s: %s" % (e.class.name, emsg) 161 print_exec(sys.stderr) --> 162 raise pickle.PicklingError(msg) 163 164 def save_memoryview(self, obj):
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o62.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)