25

I'm invoking Pyspark with Spark 2.0 in local mode with the following command:

pyspark --executor-memory 4g --driver-memory 4g

The input dataframe is being read from a tsv file and has 580 K x 28 columns. I'm doing a few operation on the dataframe and then i am trying to export it to a tsv file and i am getting this error.

df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')

Any pointers how to get rid of this error. I can easily display the df or count the rows.

The output dataframe is 3100 rows with 23 columns

Error:

Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes of memory, got 0
    at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:129)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.fetchNextRow(WindowExec.scala:300)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.<init>(WindowExec.scala:309)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:289)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:288)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
    ... 8 more

Driver stacktrace:
gsamaras
  • 71,951
  • 46
  • 188
  • 305
ML_Passion
  • 1,031
  • 3
  • 15
  • 33
  • 1
    Have you tried without `coalesce()`? Clearly you are running out of memory. What is your configuration? – gsamaras Aug 15 '16 at 19:20
  • I have tried without coalesce and it runs fine.My configuration is Intel i-7 with 16 GB RAM and Windows 7 professional. I have exported other files in the past with 0.5M rows and 15-20 columns using the same approach and it worked fine. – ML_Passion Aug 15 '16 at 19:23
  • How did you solve this issue? – zonna Jul 01 '22 at 08:50

5 Answers5

22

I believe that the cause of this problem is coalesce(), which despite the fact that it avoids a full shuffle (like repartition would do), it has to shrink the data in the requested number of partitions.

Here, you are requesting all the data to fit into one partition, thus one task (and only one task) has to work with all the data, which may cause its container to suffer from memory limitations.

So, either ask for more partitions than 1, or avoid coalesce() in this case.


Otherwise, you could try the solutions provided in the links below, for increasing your memory configurations:

  1. Spark java.lang.OutOfMemoryError: Java heap space
  2. Spark runs out of memory when grouping by key
zero323
  • 322,348
  • 103
  • 959
  • 935
gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • I got the same issue, I dont use coalesce and already increase the executor-memory but keep get the same issue "org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0". Not only the same issue even the same value 16384.. Do you have any idea? – zonna Jul 01 '22 at 08:49
21

In my case replacing the coalesce(1) with repartition(1) Worked.

20

The problem for me was indeed coalesce(). What I did was exporting the file not using coalesce() but parquet instead using df.write.parquet("testP"). Then read back the file and export that with coalesce(1).

Hopefully it works for you as well.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
bobo32
  • 992
  • 2
  • 9
  • 21
  • I got the same issue, I dont use coalesce and already increase the executor-memory but keep get the same issue "org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0". Not only the same issue even the same value 16384.. Do you have any idea? – zonna Jul 01 '22 at 08:49
17

As was stated in other answers, use repartition(1) instead of coalesce(1). The reason is that repartition(1) will ensure that upstream processing is done in parallel (multiple tasks/partitions), rather than on only one executor.

To quote the Dataset.coalesce() Spark docs:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition(1) instead. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

Mark Rajcok
  • 362,217
  • 114
  • 495
  • 492
3

In my case the driver was smaller than the workers. Issue was resolved by making the driver larger.

Farshad Javadi
  • 419
  • 6
  • 7