0

I am trying to convert spark dataframe to pandas dataframe. However getting error.

Config for Spark Session

{
"conf": 
    {
        "spark.pyspark.python": "python", 
        "spark.pyspark.virtualenv.enabled": "true", 
        "spark.pyspark.virtualenv.type": "native", 
        "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv"
    }
}

Code

import os
os.environ['http_proxy'] = "http://10.10.10.1:443" 
os.environ['https_proxy'] = "http://10.10.10.1:443" 
sc.install_pypi_package("pandas")
del os.environ['http_proxy']
del os.environ['https_proxy']
some_df = sc.parallelize([
 ("A", "no"),
 ("B", "yes"),
 ("B", "yes"),
 ("B", "no")]
 ).toDF(["user_id", "phone_number"])

pandas_df = some_df.toPandas()

I am verifying that I can see pandas package before I run above code using following code.

sc.list_packages()

However it seems like when it runs the code it trying to install pandas on all the executor as well and that's when it fails.

Error

  Could not find a version that satisfies the requirement pandas (from versions: )
No matching distribution found for pandas
21/03/03 15:56:53 INFO VirtualEnvFactory: Start to setup virtualenv...
21/03/03 15:56:53 INFO VirtualEnvFactory: Running command:/usr/bin/virtualenv -p python --system-site-packages virtualenv_application_1609789675521_0076_0
21/03/03 15:56:53 ERROR Executor: Exception in task 1.0 in stage 3.0 (TID 4)
java.lang.RuntimeException: Failed to run command: virtualenv_application_1609789675521_0076_0/bin/python -m pip install pandas
    at org.apache.spark.api.python.VirtualEnvFactory.org$apache$spark$api$python$VirtualEnvFactory$$execCommand(VirtualEnvFactory.scala:120)
    at org.apache.spark.api.python.VirtualEnvFactory$$anonfun$setupVirtualEnv$6.apply(VirtualEnvFactory.scala:98)
    at org.apache.spark.api.python.VirtualEnvFactory$$anonfun$setupVirtualEnv$6.apply(VirtualEnvFactory.scala:86)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.api.python.VirtualEnvFactory.setupVirtualEnv(VirtualEnvFactory.scala:86)
    at org.apache.spark.api.python.PythonWorkerFactory.<init>(PythonWorkerFactory.scala:85)
    at org.apache.spark.SparkEnv$$anonfun$createPythonWorker$1.apply(SparkEnv.scala:118)
    at org.apache.spark.SparkEnv$$anonfun$createPythonWorker$1.apply(SparkEnv.scala:118)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:109)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/03 15:56:53 INFO CoarseGrainedExecutorBackend: Got assigned task 5
21/03/03 15:56:53 INFO Executor: Running task 1.1 in stage 3.0 (TID 5)

screenshot from Spark UI

enter image description here

Update: I can't access internet from my spark machine. and that's what causing the issue. Is there any way I can configure virtual env with proxy?

21/03/03 16:57:12 INFO VirtualEnvFactory: Running command:virtualenv_application_1609789675521_0077_0/bin/python -m pip install pandas
  Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by 'NewConnectionError('<pip._vendor.requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x7f527ee470f0>: Failed to establish a new connection: [Errno 101] Network is unreachable',)': /simple/pandas/
  Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by 'NewConnectionError('<pip._vendor.requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x7f527ee47748>: Failed to establish a new connection: [Errno 101] Network is unreachable',)': /simple/pandas/
  Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by 'NewConnectionError('<pip._vendor.requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x7f527ee47710>: Failed to establish a new connection: [Errno 101] Network is unreachable',)': /simple/pandas/
  Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by 'NewConnectionError('<pip._vendor.requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x7f527ee47f60>: Failed to establish a new connection: [Errno 101] Network is unreachable',)': /simple/pandas/
  Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by 'NewConnectionError('<pip._vendor.requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x7f527ee47a20>: Failed to establish a new connection: [Errno 101] Network is unreachable',)': /simple/pandas/
  Could not find a version that satisfies the requirement pandas (from versions: )
No matching distribution found for pandas
2
Gaurang Shah
  • 11,764
  • 9
  • 74
  • 137
  • yes `toPandas` is executed on the driver : check [this](https://stackoverflow.com/questions/30983197/requirements-for-converting-spark-dataframe-to-pandas-r-dataframe/30991297#30991297) – anky Mar 03 '21 at 16:22
  • @anky thanks. yeah that make sense.. as pandas is not distributed. However, if I am able to see Pandas library in the installed package. why it's trying to install again on executor node? – Gaurang Shah Mar 03 '21 at 16:26

0 Answers0