22

I am trying to enable Apache Arrow for conversion to Pandas. I am using:

pyspark 2.4.4 pyarrow 0.15.0 pandas 0.25.1 numpy 1.17.2

This is the example code

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
x = pd.Series([1, 2, 3])
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

I got this warning message

c:\users\administrator\appdata\local\programs\python\python37\lib\site-packages\pyspark\sql\session.py:714: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
  An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile.
: java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$.readArrowStreamFromFile(ArrowConverters.scala:214)
    at org.apache.spark.sql.api.python.PythonSQLUtils$.readArrowStreamFromFile(PythonSQLUtils.scala:46)
    at org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile(PythonSQLUtils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
  warnings.warn(msg)
R. Lamari
  • 331
  • 1
  • 2
  • 3
  • 1
    Possible duplicate of [pandasUDF and pyarrow 0.15.0](https://stackoverflow.com/questions/58273063/pandasudf-and-pyarrow-0-15-0) – pault Oct 28 '19 at 15:25

2 Answers2

52

We made a change in 0.15.0 that makes the default behavior of pyarrow incompatible with older versions of Arrow in Java -- your Spark environment seems to be using an older version.

Your options are

  • Set the environment variable ARROW_PRE_0_15_IPC_FORMAT=1 from where you are using Python
  • Downgrade to pyarrow < 0.15.0 for now.
Wes McKinney
  • 101,437
  • 32
  • 142
  • 108
  • Wow, this has been bugging me for days! I had a very simple groupBy.apply statement with a PandasUDF method and it was constantly failing where it didn't before. Turns out that the build environment had started using 0.15 ><. – Dammi Oct 09 '19 at 09:52
  • THIS IS THE ANSWER. Please put the checkmark underneath it :) thank you @Wes McKinney – Ravaal Dec 20 '19 at 16:46
  • 1
    is this answer still current? – xiaodai Aug 17 '20 at 17:24
13

For calling my pandas UDF in my Spark 2.4.4 cluster with pyarrow==0.15. I struggled with setting the ARROW_PRE_0_15_IPC_FORMAT=1 flag as mentioned above successfully.

I set the flag in (1) the command line via export on the head node, (2) via spark-env.sh and yarn-env.sh on all nodes in the cluster, and (3) in the pyspark code itself from my script on the head node. None of these worked to actually set this flag inside of the udf, for unknown reasons.

The simplest solution I found was to call this inside the udf:

    @pandas_udf("integer", PandasUDFType.SCALAR)
    def foo(*args):
        import os
        os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
        #...

Hopefully this saves someone else several hours.

K.S.
  • 2,846
  • 4
  • 24
  • 32
  • 1
    Thank you! The other solutions did not work for us, but this did. AWS EMR 2.4.x, so perhaps the config setting is not so easy on EMR as it is on other spark solutions. – Will Faithfull Mar 01 '21 at 17:21
  • 3
    This saved me several hours. For some reason my dev endpoint has pyarrow 0.13.0 which works fine and the AWS Glue cluster has 0.16.0 which fails. This solution is a good workaround until we fix it properly. – Rob Fisher Mar 17 '21 at 12:25
  • 1
    Use `--conf spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1` when launching your job. If you are using AWS Glue, pass it using the section "Job parameters" (a key/value properties). The key is `--conf` and the value is `spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1` – noleto Nov 08 '21 at 13:24