3

While running pyspark3 with pandas 1.1.5 and pyarrow 2.0.0 getting the below error:

Spark Code:

import pyarrow
import pandas as pd

df = pd.DataFrame({'col1' : [1,2,3], 'col2': [4,5,6]})
df_sp = spark.createDataFrame(df)

df_sp.cache().count()

schema = df_sp.schema

def dummy_udf(data):
    return data

res = df_sp.groupby('col1').applyInPandas(dummy_udf, schema=schema)

print(res.cache().count())
print(res.toPandas())

Exception:

21/09/17 07:28:10 ERROR util.Utils: Uncaught exception in thread stdout writer for python3
java.lang.NoSuchMethodError: com.google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)I
        at org.apache.arrow.vector.types.pojo.Field.getField(Field.java:204)
        at org.apache.arrow.vector.types.pojo.Schema.getSchema(Schema.java:178)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serializeMetadata(MessageSerializer.java:187)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:165)
        at org.apache.arrow.vector.ipc.ArrowWriter.ensureStarted(ArrowWriter.java:159)
        at org.apache.arrow.vector.ipc.ArrowWriter.start(ArrowWriter.java:112)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:86)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
21/09/17 07:28:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python3,5,main]
java.lang.NoSuchMethodError: com.google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)I
        at org.apache.arrow.vector.types.pojo.Field.getField(Field.java:204)
        at org.apache.arrow.vector.types.pojo.Schema.getSchema(Schema.java:178)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serializeMetadata(MessageSerializer.java:187)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:165)
        at org.apache.arrow.vector.ipc.ArrowWriter.ensureStarted(ArrowWriter.java:159)
        at org.apache.arrow.vector.ipc.ArrowWriter.start(ArrowWriter.java:112)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:86)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
21/09/17 07:28:10 WARN storage.BlockManager: Putting block rdd_25_69 failed due to exception org.apache.spark.SparkException: Python worker exited unexpectedly (crashed).
21/09/17 07:28:10 INFO memory.MemoryStore: MemoryStore cleared
21/09/17 07:28:10 INFO storage.BlockManager: BlockManager stopped
21/09/17 07:28:10 INFO util.ShutdownHookManager: Shutdown hook called
Ranga Reddy
  • 2,936
  • 4
  • 29
  • 41

1 Answers1

1

While checking createString() method implementations in FlatBufferBuilder.java class, it has two methods one is accepting CharSequence and another one is accepting the ByteBuffer as a argument.

public int createString(CharSequence s) {

}

public int createString(ByteBuffer s) {

}

While checking getField() method implementation in Field.java class, here it is passed String value.

public class Field {
    private final String name;

    public int getField(FlatBufferBuilder builder) {
        int nameOffset = name == null ? -1 : builder.createString(name);

    }
}

To fix this issue, we need to pass either CharSequence or ByteBuffer as the argument in getField() method.

Solution:

public int getField(FlatBufferBuilder builder) {
   
    java.nio.ByteBuffer bb = java.nio.ByteBuffer.wrap(name.getBytes());
    int nameOffset = name == null ? -1 : builder.createString(bb);
    .......
}
Ranga Reddy
  • 2,936
  • 4
  • 29
  • 41
  • the question is in python and you've offered no instruction on how to fix the problem in pyspark – Aaron Zolnai-Lucas Dec 12 '22 at 17:45
  • The issue is coming from the arrow framework. We need to solve the issue from the arrow framework. – Ranga Reddy Dec 13 '22 at 12:32
  • The question contains python code running on pyspark. The arrow framework is a transitive dependency of spark - the user does not interface with arrow directly (see the question). If the issue has to be fixed in arrow, this is a bug in spark/arrow. Are you saying that you have identified a bug in pyspark? If not, please provide details of how your code snippets can be used to fix the `.toPandas()` call – Aaron Zolnai-Lucas Dec 13 '22 at 13:48
  • Besides, `java.lang.String` extends `java.lang.CharSequence` (see https://docs.oracle.com/javase/8/docs/api/java/lang/String.html) so it is already valid for arrow to pass a `String` to` FlatBufferBuilder.createString` in the link you shared (https://github.com/apache/arrow/blob/apache-arrow-2.0.0/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java#L204) - wrapping it in a `ByteBuffer` as your answer does would not make any difference and is misleading as an answer to this question – Aaron Zolnai-Lucas Dec 13 '22 at 14:17