10

I am trying to clean a Spark DataFrame by mapping it to RDD then back to DataFrame. Here's a toy example:

def replace_values(row,sub_rules):
    d = row.asDict()
    for col,old_val,new_val in sub_rules:
        if d[col] == old_val:
            d[col] = new_val      
    return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in ex.columns]))
    .toDF(schema=ex.schema))

Running the code above results in a Py4JError with a very long stack trace ending in the following:

Py4JError: An error occurred while calling o801.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

What is going on here? How do I fix it? I am using PySpark 1.5.2.

Paul
  • 3,321
  • 1
  • 33
  • 42

1 Answers1

14

The error is caused by the reference to ex.columns in the .map(lambda...) statement. You can't have references to an RDD inside the function being used in an RDD transformation. Spark is supposed to issue more helpful errors in this case, but apparently that didn't make it into this version.

Solution is to replace references with copies of the referenced variables:

def replace_values(row,sub_rules):
    d = row.asDict()
    for col,old_val,new_val in sub_rules:
        if d[col] == old_val:
            d[col] = new_val      
    return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
cols = copy.deepcopy(ex.columns)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in cols]))
    .toDF(schema=ex.schema))
Paul
  • 3,321
  • 1
  • 33
  • 42
  • 2
    You're right about the direct reason but it is a different issue than a JIRA you've linked. Ignoring pickling issue it is pretty much the same problem as I've described [here](http://stackoverflow.com/q/31684842/1560062). Since DataFrame is just a wrapper around JVM object it cannot be accessed from PySpark action / transformation. But it would work just fine in Scala. – zero323 Dec 24 '15 at 00:14