0

Based on the code in the link https://alexwlchan.net/2019/09/unpacking-compressed-archives-in-scala/ I have a UDF in scala to extract tar.gz files (of size 6GB) which contain upto 1000 json documents

val udfExtract = udf((data: Array[Byte]) => Unpacker(data) : Map[String,String])

The UDF returns a map of key (json file name) and value (actual json content) pairs i.e upto 1000 pairs for each tar.gz file

Binary File Schema

   val binaryFileSchema = StructType( Array(
  StructField( "path", StringType, true), 
  StructField( "modificationTime", TimestampType, true), 
  StructField( "length", LongType, true), 
  StructField( "content", BinaryType,true)))

Load Data where the input path contains 24 tar.gz files

val binaryDF = spark.read
                .format("binaryFile").option("pathGlobFilter", "*.tar.gz")
                .schema( binaryFileSchema)
                .load(input_path)

Extract content using UDF and apply a pre defined schema to parse the extracted content

val parseDF = binaryDF.withColumn("extracted_content", udfExtract($"content"))
                    .withColumn("file_path", $"path")
                    .select($"file_path",explode($"extracted_content")).toDF("file_path", "key", "value")
                    .withColumn("my_json", from_json($"value", MySchema))
                    .persist()

Write to Table

 parseDF.select($"my_json")
.write.format("delta").mode("append")
.saveAsTable("default.raw_data")

When executing the code I get OutOfMemoryError: Java heap space

I feel this OOM error maybe because of using the above UDF to extract the huge compressed files (50GB after uncomperssion) or is maybe related to garbage collection. How can I resolve this?

Below is the stacktrace

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 59, 10.24.48.68, executor 1): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt/raw_data_1583107200.tar.gz.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:331)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:310)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:397)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:250)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$3$$anon$1.hasNext(InMemoryRelation.scala:137)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1235)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1226)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1161)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1226)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1045)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:315)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:317)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:317)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:170)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1$$anonfun$apply$1.apply(TransactionalWriteEdge.scala:160)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1$$anonfun$apply$1.apply(TransactionalWriteEdge.scala:133)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:243)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:173)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1.apply(TransactionalWriteEdge.scala:133)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1.apply(TransactionalWriteEdge.scala:90)
    at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:428)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
    at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:409)
    at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
    at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
    at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:69)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
    at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:344)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:82)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:82)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$class.writeFiles(TransactionalWriteEdge.scala:90)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:82)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWrite$class.writeFiles(TransactionalWrite.scala:110)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:82)
    at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:111)
    at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
    at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
    at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:428)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
    at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:409)
    at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
    at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
    at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:69)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
    at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:344)
    at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
    at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
    at com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:193)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:189)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:117)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:711)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:243)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:173)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:711)
    at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:509)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:488)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:431)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-392784447715347:4)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-392784447715347:63)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-392784447715347:65)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-392784447715347:67)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-392784447715347:69)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-392784447715347:71)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw$$iw.<init>(command-392784447715347:73)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw$$iw.<init>(command-392784447715347:75)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw$$iw.<init>(command-392784447715347:77)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$$iw.<init>(command-392784447715347:79)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read.<init>(command-392784447715347:81)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$.<init>(command-392784447715347:85)
    at line3f718bbfd7704a88872ddbd33faa7db546.$read$.<clinit>(command-392784447715347)
    at line3f718bbfd7704a88872ddbd33faa7db546.$eval$.$print$lzycompute(<notebook>:7)
    at line3f718bbfd7704a88872ddbd33faa7db546.$eval$.$print(<notebook>:6)
    at line3f718bbfd7704a88872ddbd33faa7db546.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt/raw_data_1583107200.tar.gz.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:331)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:310)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:397)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:250)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$3$$anon$1.hasNext(InMemoryRelation.scala:137)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1235)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1226)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1161)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1226)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1045)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:315)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:317)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:317)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
puligun
  • 332
  • 3
  • 12
  • Can you post your Spark-submit command? – Vijay_Shinde Dec 24 '20 at 06:23
  • https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space – Vijay_Shinde Dec 24 '20 at 06:39
  • 1
    extract the files using bash (or other shells), then process the extracted jsons using spark. spark is not designed for extracting archives - that's the job of shells. – mck Dec 24 '20 at 08:09
  • @Vijay - I am running this inside a databricks notebook with cluster config of 1 driver(28gb ram, 4 cores) and 10 workers with each 28gb ram and 4 cores. Let me know if i can provide any other spark config details – puligun Dec 24 '20 at 13:11
  • @mck - I was planning to load the tar.gz files in spark and extract them in memory to avoid extracting the files to disk since the total size of the extracted files will be approx. 8 times more than the compressed size. – puligun Dec 24 '20 at 13:19
  • @Sandesh this does not make sense - if it can't fit into the disk, then it definitely can't fit into memory. Clusters almost always have more disk space than memory – mck Dec 24 '20 at 13:21
  • @mck- the tar.gz files are located in azure data lake, I am using spark to read the files from adl. Rather than extracting the files to adl and then loading the json files I was planning to load the tar.gz files and extract them using spark – puligun Dec 24 '20 at 13:58

0 Answers0