1

I need to convert Pyspark DataFrame to Pandas(). There are 68467 rows. I checked some solutions here, and knew that Arrow can help. I installed pyarrow:

>conda install -c conda-forge pyarrow

I set Arrow in my pyspark.py code:

>SparkConf().set("spark.sql.execution.arrow.enabled", "true")

but still got error. What else do I miss?

Here is my code:

SparkConf().set("spark.sql.execution.arrow.enabled", "true")

accum=saccum.select("*").toPandas()

my running command:

>spark-submit --driver-memory 20g --num-executors 10 --executor-cores 4 --executor-memory 20g --conf spark.default.parallelism=4 --conf spark.driver.maxResultSize=50g --conf "spark.local.dir=./pyspark_tmp" project.py

Error mainly three parts:

1>ERROR Utils: Uncaught exception in thread stdout writer for python 

2>java.lang.OutOfMemoryError: Java heap space

3>ERROR Utils: Uncaught exception in thread pool-1-thread-1



Total  68467  of rows in Pyspark DataFrame
19/07/25 04:12:22 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:204)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:52)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
Exception in thread "stdout writer for python" java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage11.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

........
19/07/25 04:14:29 ERROR Utils: Uncaught exception in thread pool-1-thread-1
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:244)
        at org.apache.spark.storage.BlockManagerMaster.stop(BlockManagerMaster.scala:236)
        at org.apache.spark.SparkEnv.stop(SparkEnv.scala:91)
        at org.apache.spark.SparkContext$$anonfun$stop$11.apply$mcV$sp(SparkContext.scala:1947)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1946)
        at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:573)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Serenity
  • 35,289
  • 20
  • 120
  • 115
Helen Z
  • 21
  • 1
  • 8
  • You're running out of memory. `toPandas` and any `collect` type operation funnels all of the data to the master node. You *can* try some of the techniques outlined [here](https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space), but the better question to ask is why do you need to bring the data to pandas? You'll lose all the benefits of parallelization. If you *still* need to use pandas, you have some options: 1) increase memory 2) reduce the data somehow: sample or drop some columns 3) write the data to hdfs and the copy down to local (then load the csv) – pault Jul 25 '19 at 17:57
  • adding to above ^ ... if you have small dataset then just use pandas on a single machine and not spark shell for distributing big dataset ... i would never recommended using ```toPandas``` other than taking a sample of data to run some plotting in ```seaborn``` or ```matplotlib``` ... you could also use ```pandas_udf``` which has ```arrow``` integration for columnar optimization if you need to use spark – thePurplePython Jul 25 '19 at 18:01
  • Hi @pault, i have used Pyspark to finish the main job, the dataset size is reduced significantly from the beginning. Now at the point I need to add lots of columns and then upload result to vertica. Pandas is easier, and my dataset now is only 68k rows. I even set drive.memory to 16g, still could not convert toPandas(). – Helen Z Jul 25 '19 at 18:56

0 Answers0