4

I am trying to apply a function to each group of a dataset in pyspark. The first error I was getting was

Py4JError: An error occurred while calling o62.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

To solve the above, I removed the spark function (I had spark.range()). Now the error is solved but I now get the following:

File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/serializers.py", line 276, in load_stream
    import pyarrow as pa
ImportError: No module named pyarrow 

But when I try it on its own it works, ie.

df = pd.DataFrame({"a": [1, 2, 3]})
pa.Table.from_pandas(df)
pyarrow.Table
a: int64
__index_level_0__: int64
metadata
--------
{'pandas': '{"pandas_version": "0.23.0", "index_columns": ["__index_level_0__"], "columns": [{"metadata": null, "field_name": "a", "name": "a", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "__index_level_0__", "name": null, "numpy_type": "int64", "pandas_type": "int64"}], "column_indexes": [{"metadata": null, "field_name": null, "name": null, "numpy_type": "object", "pandas_type": "bytes"}]}'}

EXAMPLE THAT FAILS - Taken from here

import pyspark.sql.functions as F
import pandas as pd

cols = ['id', 'val']
vals = [('A', 5), ('A', 3), ('A', 7), ('B', 12), ('B', 15), ('C', 3)]
d1 = sqlContext.createDataFrame(vals, cols)

>>> @F.pandas_udf(d1.schema, F.PandasUDFType.GROUPED_MAP)
... def subtract_mean(pdf):
...     return pdf.assign(v=pdf.v - pdf.v.mean())
...
>>> try1 = d1.groupby('id').apply(subtract_mean)
>>> try1.show()

If I try to convert to pandas dataframe, it does not work because It does not have the schema attribute.

What am I missing here?

Sotos
  • 51,121
  • 6
  • 32
  • 66

2 Answers2

6

pyarrow has to be present on the path on each worker node.

  • If you run this code on as single node, make sure that PYSPARK_PYTHON (and optionally its PYTHONPATH) are the same as the interpreter you use to test pyarrow code.
  • If you use cluster, make sure that pyarrow is installed on each node, additionally to points made above.

Moreover make sure that installed pyarrow version is greater or equal than minimum supported (0.8 today) - although this should cause different exception.

  • Thank you for your effort. I m on a cluster so this could be it. I just talked with the dev that installed spark et al, and he wasn't sure. I will leave this question open until I test it tomorrow. If that was it I will accept your answer. – Sotos Jun 28 '18 at 14:06
  • I am getting the same error from pyspark interactive shell. However [pyspark always runs in client mode] (https://stackoverflow.com/questions/38384224/how-to-know-deploy-mode-of-pyspark-application) then why do we need to pyarrow is installed on each node? – user 923227 Jul 24 '18 at 22:22
0

Adding to the accepted answer :

One issue can be user delegation: ie

  • python module installed with userx
  • spark-submit launched by userx
  • but spark is the technical user executing logic on the workers.
    (behind the scenes, without you noticing).

It might be that sparkis not seeing the module.

To fix that :
su as spark, and install the missing module via pipinstall (on all workers)

That fixed it for me.

Edit :
depending on your settings pyspark shell might still be able to see your module though.

Mehdi LAMRANI
  • 11,289
  • 14
  • 88
  • 130