0

I am trying to use Pandas "apply" inside the parallelized code but the "apply" is not working at all. Can we use "apply" inside the code which gets distributed to the executors while using Spark (parallelize on RDD)?

Code:

def testApply(k):
    return pd.DataFrame({'col1':k,'col2':[k*2]*5})

def testExec(x):
    df=pd.DataFrame({'col1':range(0,10)})
    ddf=pd.DataFrame(columns=['col1', 'col2'])
    ##In my case the below line doesn't get executed at all
    res= df.apply(lambda row: testApply(row.pblkGroup) if row.pblkGroup%2==0 else pd.DataFrame(), axis=1)

list1=[1,2,3,4]
sc=SparkContext.getOrCreate()
testRdd= sc.parallelize(list1)
output=testRdd.map(lambda x: testExec(x)).collect()



3 Answers3

0

To use Pandas inside Spark you have 2 options:-

Using Closures

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

More details can be found here [1]

Example

import numpy as np
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()

B = [2,0,1,0] 
V = [5,1,2,4]

def V_sum(row,b,c):
    return float(np.sum(c[row==b]))

v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())    
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))

Details can be found here [2]

Using Pandas UDF

With Spark 2.4.4 there is an out of the box to use Pandas with Spark. The details can be found here along with examples [3]

1 - http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures- 2 - Custom function over pyspark dataframe 3 - https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html

Community
  • 1
  • 1
Jayadeep Jayaraman
  • 2,747
  • 3
  • 15
  • 26
  • Thanks for the suggestions. Looks like Pandas with version lower than 0.21 does not support this functionality. I have upgraded the Pandas version and it's working fine. – Parikshit Deshmukh Oct 28 '19 at 20:09
0

Looks like Pandas with version lower than 0.21 does not support this functionality. I have upgraded the Pandas version and it's working fine.

0

I am also getting the same error (TypeError: an integer is required (got type bytes)

from pyspark.context import SparkContext

TypeError                                 Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_11288/3937779276.py in <module>
----> 1 from pyspark.context import SparkContext

~\miniconda3\lib\site-packages\pyspark\__init__.py in <module>
     49 
     50 from pyspark.conf import SparkConf
---> 51 from pyspark.context import SparkContext
     52 from pyspark.rdd import RDD, RDDBarrier
     53 from pyspark.files import SparkFiles

~\miniconda3\lib\site-packages\pyspark\context.py in <module>
     29 from py4j.protocol import Py4JError
     30 
---> 31 from pyspark import accumulators
     32 from pyspark.accumulators import Accumulator
     33 from pyspark.broadcast import Broadcast, BroadcastPickleRegistry

~\miniconda3\lib\site-packages\pyspark\accumulators.py in <module>
     95     import socketserver as SocketServer
     96 import threading
---> 97 from pyspark.serializers import read_int, PickleSerializer
     98 
     99 

~\miniconda3\lib\site-packages\pyspark\serializers.py in <module>
     69     xrange = range
     70 
---> 71 from pyspark import cloudpickle
     72 from pyspark.util import _exception_message
     73 

~\miniconda3\lib\site-packages\pyspark\cloudpickle.py in <module>
    143 
    144 
--> 145 _cell_set_template_code = _make_cell_set_template_code()
    146 
    147 

~\miniconda3\lib\site-packages\pyspark\cloudpickle.py in _make_cell_set_template_code()
    124         )
    125     else:
--> 126         return types.CodeType(
    127             co.co_argcount,
    128             co.co_kwonlyargcount,

TypeError: an integer is required (got type bytes

)

Hafez Ahmad
  • 175
  • 2
  • 7