1

I have a Dataproc cluster with Anaconda. I've created a virtual env. inside the anaconda my-env as I need to install open source RDkit there and hence I've installed PySpark again (not using pre-installed one). Now with the below code I'm getting error in my-env but not on outside of the my-env

Code:

from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql import SparkSession
from py4j.protocol import Py4JJavaError
spark = SparkSession.builder.appName("test").getOrCreate()

fields = [StructField("col0", StringType(), True),
          StructField("col1", StringType(), True),
          StructField("col2", StringType(), True),
          StructField("col3", StringType(), True)]
schema = StructType(fields)

chem_info = spark.createDataFrame([], schema)

This is the error I'm getting:

  File
"/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/sql/session.py",
line 749, in createDataFrame
    jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())   File
"/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 2297, in _to_java_object_rdd
    rdd = self._pickled()   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 196, in _pickled
    return self._reserialize(AutoBatchedSerializer(PickleSerializer()))   File
"/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 594, in _reserialize
    self = self.map(lambda x: x, preservesPartitioning=True)   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 325, in map
    return self.mapPartitionsWithIndex(func, preservesPartitioning)   File
"/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 365, in mapPartitionsWithIndex
    return PipelinedRDD(self, f, preservesPartitioning)   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 2514, in __init__
    self.is_barrier = prev._is_barrier() or isFromBarrier   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/rdd.py",
line 2414, in _is_barrier
    return self._jrdd.rdd().isBarrier()   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/pyspark/sql/utils.py",
line 63, in deco
    return f(*a, **kw)   File "/home/.conda/envs/my-env/lib/python3.6/site-packages/py4j/protocol.py",
line 332, in get_return_value
    format(target_id, ".", name, value)) py4j.protocol.Py4JError: An error occurred while calling o57.isBarrier. Trace: py4j.Py4JException:
Method isBarrier([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

Can you help me resolve it?

Igor Dvorzhak
  • 4,360
  • 3
  • 17
  • 31
sopana
  • 365
  • 1
  • 5
  • 15

1 Answers1

0

As mentioned in the pyspark: Method isBarrier([]) does not exist question, this error caused by incompatibilities between different versions of Spark installed in Dataproc cluster and PySpark that you manually installed in your conda environment.

To solve this issues you need to check Spark version on the cluster and install appropriate version of PySpark:

$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_232

$ conda install pyspark==2.4.4
Igor Dvorzhak
  • 4,360
  • 3
  • 17
  • 31