0

I'm using Pyspark in Zeppelin 0.7.3 with Spark 2.2.0.

In one Zeppelin %pyspark paragraph I create an RDD using various methods (mapPartitions and flatMap) and finally call collect() to look at the results as a list. I can run this paragraph repeatedly and it always works.

But when I then try to create a Spark DataFrame from that RDD in a second Zeppelin paragraph by using spark.createDataFrame() and call show() on that dataframe it works as expected only the first time I execute it! If I run this second paragraph a 2nd, 3rd time, etc I get a long stacktrace ending in this:

  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/home/lucas/spark/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/lucas/spark/spark-2.2.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/lucas/spark/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
    format(target_id, ".", name, value))
Py4JError: An error occurred while calling o1571.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)

If I re-run the first paragraph that creates the RDD then the first time I run the second paragraph it works; the next time it fails with the same error.

Has anyone else seen such odd behaviour as this and can think what might be causing it?!

The answer to PySpark Throwing error Method __getnewargs__([]) does not exist says not to nest RDDs or pass the SparkContext to the executor nodes. I'm not doing that as far as I can tell. I've also checked that all the args to the function I'm calling in mapPartitions() are picklable and can be passed in a dummy function.

I haven't tried running this outside of Zeppelin yet in case it's a Zeppelin bug but I'm out of other ideas...

snark
  • 2,462
  • 3
  • 32
  • 63
  • 1
    Can you please write the code here. How sqlcontext is created. – Manu Gupta Feb 02 '18 at 13:53
  • Sorry, no, not easily, otherwise I would've done that already. (Zeppelin creates the `SparkSession` variable for you, and calls it `spark`. I'm not using `SQLContext`.) I'm just wondering if anyone else has seen behaviour like this in Spark and what they did to fix it. I believe I've already ruled out nested RDDs or sending `spark` to the executor nodes. – snark Feb 02 '18 at 17:21
  • Spark session variable creates spark context right if I talk in normal spark. I have'nt worked zeppelin. In general, we first create spark context and then on top of that we create sqlcontext and from that sqlcontext.createDataFrame command we use. Let me do some analysis on zeepelin – Manu Gupta Feb 02 '18 at 19:19
  • Checked.. You are right. I was using spark 1.6. Can you try by the older way of creating dataframe.? I think, zeppelin support that as well. Also, u cam check rdd.toDF() to create dataframe. – Manu Gupta Feb 02 '18 at 19:27
  • I can try these when I'm back in the office on Monday, although I must admit I'll be surprised if they make any difference...:) – snark Feb 03 '18 at 10:14
  • Sure enough, using `sqlContext.createDataFrame()` makes no difference - it works the first time but not the second. I'm not sure `rdd.toDF()` is available any more in recent versions of Spark; I think it was deprecated. There is a `toDF()` method but only on existing DataFrames: https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF – snark Feb 05 '18 at 10:20
  • I was creating the RDDs in a `def _transform(self, dataset)` method of my own custom ML Pipeline class inherited from `pyspark.ml.pipeline.Model`. Inside `_transform()` I was passing a function to `RDD.mapPartitions()`. As long as the function passed is defined outside of `_transform()` I can create my Spark `DataFrame` reliably and repeatedly. But if the function passed to `mapPartitions()` was defined inside `_transform()` then it would only work the first time for some reason. Possibly the function went out of scope after the first execution. But anyway it works now! – snark Feb 05 '18 at 16:42
  • Great. Thanks for sharing the reason. – Manu Gupta Feb 05 '18 at 17:11

0 Answers0