I overwrite existing parquet partition with apache spark 2.2 everytime when there is a request to update entire partition. Data is skewed. So some partitions are huge. Initially, I don't repartition the dataframe (created from source data) and try to write it with following psuedo spark sql.
insert overwrite table mytable partition(a=123, b=234) select c1, c2, c3 from mydataframe where a=123 and b=234
(I could have used dynamic partitioning sql update but it doesn't change anything about the issue.)
Now, because this partition have huge data (over 5g) I get java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
It seem to indicate that block size went over 2g spark limit. Common Solution suggested for it is to increase number of partition of your dataframe.
If I increase number of partitions in dataframe then I get
CommitDeniedException: attempt Not committed because the driver did not authorize commit
and
TaskCommitDenied (Driver denied task commit) for job: 124, partition: 39, attemptNumber: 0
Following is the order of exception I see in most of the executor logs. Shuffle failed, then CommitDeniedException and then OutOfMemory towards end where it dies
19/03/15 12:42:52 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches
19/03/15 13:10:22 ERROR util.Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: attempt_20190315131022_0085_m_000042_360: Not committed because the driver did not authorize commit
at org.apache.spark.mapred.SparkH
19/03/15 13:34:48 ERROR util.Utils: Uncaught exception in thread driver-heartbeater
java.lang.OutOfMemoryError: Java heap space
I can't figure out why I am getting TaskCommitDenied
. spark.speculation
is disabled by default.
I think doing dataframe repartitioning may be causing this issue. Correct me if I am wrong but - Because I am OVERwriting data to phsycial parquet partitions, dataframe partition count also has to be same otherwise multiple task probably try to overwrite it same time. If your dataframe have more partitions then physical partition then that might happen.
so how do I overwrite huge parquet partition using spark-sql?
Additional Stacktrace (snippet from one executor, it happens on multiple nodes):
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 10.443414 ms
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 29.516827 ms
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 51.646654 ms
19/03/16 15:39:08 INFO parquet.MapredParquetOutputFormat: creating new record writer...org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat@4c0e1ce4
19/03/16 15:39:08 INFO write.ParquetRecordWriterWrapper: initialize serde with table properties.
19/03/16 15:39:08 INFO write.ParquetRecordWriterWrapper: creating real writer to write at maprfs:/mdsMyProject/hive/warehouse/MyProjectint4242.db/table_12341factTable/.hive-staging_hive_2019-03-16_14-57-17_337_1173214459531843754-3/-ext-10000/_temporary/0/_temporary/attempt_20190316153908_0020_m_000121_0/business_id=1224/period_id=684039130/part-00121-b2416c44-6e2c-4f27-993e-0328fd64ea2e.c000
19/03/16 15:39:09 INFO write.ParquetRecordWriterWrapper: real writer: parquet.hadoop.ParquetRecordWriter@5659d6bb
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
19/03/16 15:42:36 INFO mapred.SparkHadoopMapRedUtil: attempt_20190316153908_0020_m_000121_0: Not committed because the driver did not authorize commit
19/03/16 15:42:36 ERROR util.Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: attempt_20190316153908_0020_m_000121_0: Not committed because the driver did not authorize commit
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:83)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:171)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:261)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:262)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/03/16 15:42:36 WARN output.FileOutputCommitter: Could not delete maprfs:/mdsMyProject/hive/warehouse/MyProjectint4242.db/table_12341factTable/.hive-staging_hive_2019-03-16_14-57-17_337_1173214459531843754-3/-ext-10000/_temporary/0/_temporary/attempt_20190316153908_0020_m_000121_0
19/03/16 15:42:36 ERROR datasources.FileFormatWriter: Job job_20190316153908_0020 aborted.
19/03/16 15:52:03 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1312
19/03/16 15:52:03 INFO executor.Executor: Running task 7.0 in stage 19.4 (TID 1312)
19/03/16 15:52:03 INFO spark.MapOutputTrackerWorker: Updating epoch to 42 and clearing cache
19/03/16 15:52:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 40
19/03/16 15:52:03 INFO client.TransportClientFactory: Successfully created connection to /10.250.70.17:33652 after 2 ms (0 ms spent in bootstraps)
19/03/16 15:52:03 INFO memory.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 17.1 KB, free 11.0 GB)
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.250.70.17:32803)
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Got the output locations
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 32 blocks
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 32 blocks
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 9 ms
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 14 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 153.597538 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 21.508826 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 13.097823 ms
19/03/16 16:05:18 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/03/16 16:05:18 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:19 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:19 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:20 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:20 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:21 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:22 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:23 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:23 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:24 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:25 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:25 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:26 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:27 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 INFO util.ShutdownHookManager: Shutdown hook called
19/03/16 16:05:29 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:30 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:30 INFO util.ShutdownHookManager: Deleting directory /tmp/hadoop-myCompany/nm-local-dir/usercache/myCompany/appcache/application_1552538107854_5893/spark-fc8ff151-20b7-46b2-b1d6-7cc047f1b495
19/03/16 16:05:31 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
^@^Fstdout^@^C124#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 18350"...
Sometime I see more stack trace when OOM happens
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 19.177788 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 12.840521 ms
19/03/16 16:05:15 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 10.0 in stage 20.2 (TID 1331)
java.lang.OutOfMemoryError: Java heap space
at java.io.FileInputStream.close(FileInputStream.java:326)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at org.apache.spark.network.util.LimitedInputStream.close(LimitedInputStream.java:125)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:438)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/03/16 16:05:15 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 24.0 in stage 20.2 (TID 1346)
19/03/16 16:05:15 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 24.0 in stage 20.2 (TID 1346)
java.lang.OutOfMemoryError: Java heap space
at java.lang.Integer.valueOf(Integer.java:832)
at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19/03/16 16:05:15 INFO executor.Executor: Not reporting error to driver during JVM shutdown.
19/03/16 16:05:15 INFO executor.Executor: Not reporting error to driver during JVM shutdown.
19/03/16 16:05:15 ERROR util.SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1331,5,main]
19/03/16 16:05:15 ERROR util.SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1346,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.lang.Integer.valueOf(Integer.java:832)
at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19/03/16 16:05:16 INFO util.ShutdownHookManager: Shutdown hook called
19/03/16 16:05:16 INFO util.ShutdownHookManager: Deleting directory /tmp/hadoop-myCompany/nm-local-dir/usercache/myCompany/appcache/application_1552538107854_5893/spark-0d69a896-fbdf-4627-bc0b-7384c9421fa0
^@^Fstdout^@^C122#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 930"...