0

In Pyspark, I need to create a UDF which takes multiple columns as input and return multiple columns as output. Inside the UDF, I need to do query on another DataFrame based on the key on the own DataFrame which is passed in as UDF parameters.

I got error:

pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o123.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist

I have tried to remove the query, only make the UDF takes multiple input and return multiple output, it works. But whenever I added the query part, the program stopped there.

Did I do incorrectly by passing multiple inputs? Or simply Pyspark can't do query inside of UDF? If then what should I do?

schema=StructType([
    StructField("parm1", StringType(), False),
    StructField("parm2", FloatType(), False),
    StructField("parm3", StringType(), False)])

def UDF_history(mg,ts,hl):

    s_selection=shistory.filter(shistory['ID']==mg & shistory['STARTHOUR']<=ts & shistory['Total_Records']>=SampleThreshold)
    '''
    do calculation on the s_selection
    '''

    return (parm1,parm2,parm3)

history_UDF=udf(UDF_history,schema)
saccum=shourly.withColumn("history_status", history_UDF(shourly.ID,shourly.STARTHOUR,shourly.cell_list))
saccum=saccum.select("ID","STARTHOUR","history_status.*")

My goal is: for every 'ID, STARTHOUR' combination, I need to find from shistory DataFrame with same ID but STARTHOUR <= this STARTHOUR, then aggregate those rows, and calculate some values on the aggregated rows. For example, do average of column C over the aggregated rows.

Error messages below:


Traceback (most recent call last):
File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File 

        "/.../lib/python2.7/pickle.py", line 224, in dump
        self.save(obj)
      File "/.../lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/.../lib/python2.7/pickle.py", line 554, in save_tuple
        save(element)
      File "/.../lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 372, in save_function
        self.save_function_tuple(obj)
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 525, in save_function_tuple
        save(f_globals)
      File "/.../lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/.../lib/python2.7/pickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/.../lib/python2.7/pickle.py", line 687, in _batch_setitems
        save(v)
      File "/.../lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 804, in save_reduce
        save(state)
      File "/.../lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/.../lib/python2.7/pickle.py", line 655, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/.../lib/python2.7/pickle.py", line 687, in _batch_setitems
        save(v)
      File "/.../lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 324, in get_return_value
        format(target_id, ".", name, value))
    Py4JError: An error occurred while calling o123.__getnewargs__. Trace:
    py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)


    Traceback (most recent call last):
      File "/.../prediction_pyspark.py", line 521, in <module>
        process_data(initial_data)
      File "/.../prediction_pyspark.py", line 277, in process_data
        saccum=shourly.withColumn("history_status",history_UDF(shourly.ID,shourly.STARTHOUR,shourly.cell_list))
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 186, in wrapper
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 164, in __call__
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 148, in _judf
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 157, in _create_judf
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 33, in _wrap_function
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2389, in _prepare_for_python_RDD
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 568, in dumps
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 918, in dumps
      File "/usr/hdp/2.6.5.4-1/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 249, in dump
    pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o123.__getnewargs__. Trace:
    py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)


Helen Z
  • 21
  • 1
  • 8
  • You can not reference another dataframe inside a udf. – pault Jul 23 '19 at 02:29
  • 1
    Possible duplicate of [PySpark Throwing error Method \_\_getnewargs\_\_(\[\]) does not exist](https://stackoverflow.com/questions/40470487/pyspark-throwing-error-method-getnewargs-does-not-exist) – pault Jul 23 '19 at 02:29
  • @pault, so is there anyway to achieve my goal? – Helen Z Jul 23 '19 at 04:11
  • It is the same underlying issue- as shown on the duplicate: *Using spark inside any transformation that occurs on executors is not allowed* As for an alternative method- have you tried using a join? – pault Jul 23 '19 at 17:47
  • I just did join df1 and df2, df1 left outer join df2 on df2.ID==df1.ID and df2.STARTHOUR<=df1.STARTHOUR. it works! Thanks. – Helen Z Jul 23 '19 at 19:53

0 Answers0