4

I am running the below code in spark to compare the data stored in a csv file and a hive table. My data file is about 1.5GB and about 0.2 billion rows. When I run the code below, I am getting GC overhead limit exceeded error. I am not sure why I am getting this error. I have search various articles.

The error comes at Test 3 step sourceDataFrame.except(targetRawData).count > 0 I am not sure if there is any memory leak or not. How can I debug and resolve the same?

import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    //val sourceDataLocation = "hdfs://localhost:9000/sourcec.txt"
    val sourceDataLocation = "s3a://rbspoc-sas/sas_valid_large.txt"
    val targetTableName = "temp_TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here

    // Test 1 - Validate the Structure
       println("Validating the table structure...")
       var startTime = getTimestamp()
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val sourceData = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val sourceDataFrame = spark.createDataFrame(sourceData,schema)
       //val sourceDataFrame = sourceDataFrame.toDF(sourceDataFrame.columns map(_.toLowerCase): _*)

       val sourceSchemaList = flatten(sourceDataFrame.schema).map(r => r.dataType.toString).toList
       val targetSchemaList = flatten(targetRawData.schema).map(r => r.dataType.toString).toList
       var endTime = getTimestamp()
       if (sourceSchemaList.diff(targetSchemaList).length > 0) {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed StructureValidation. ")
           // Force exit here if needed
          // sys.exit(1)
       } else {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed StructureValidation. ")
       }

    // Test 2 - Validate the Row count
       println("Validating the Row count...")
       startTime = getTimestamp()
       // check the row count.
       val sourceCount = sourceData.count()
       val targetCount = targetRawData.count()
       endTime = getTimestamp()
       if (sourceCount != targetCount){
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
           // Force exit here if needed
           //sys.exit(1)
         }
       else{
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
         }


    // Test 3 - Validate the data
    println("Comparing source and target data...")
    startTime = getTimestamp()
    if (sourceDataFrame.except(targetRawData).count > 0 ){
        endTime = getTimestamp()
        // Update the result in the table
        println("Updating DataValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed DataMatch validation")
           // Force exit here if needed
          // sys.exit(1)
         }
       else{
           endTime = getTimestamp()
           println("Updating DataValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed DataMatch validation")
         }

    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - String length validation

  def UpdateResult(tableName: String, startTime: String, endTime: String, returnCode: Int, description: String){
    val insertString = s"INSERT INTO TABLE TestResult VALUES( FROM_UNIXTIME(UNIX_TIMESTAMP()),'$startTime','$endTime','$tableName',$returnCode,'$description')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToDate(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

   def convertToDate(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

    def flatten(scheme: StructType): Array[StructField] = scheme.fields.flatMap { f =>
      f.dataType match {
      case struct:StructType => flatten(struct)
      case _ => Array(f)
       }
      }

    def getTimestamp(): String = {
        val now = java.util.Calendar.getInstance()
        val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        timestampFormat.format(now.getTime())
    }

Exception is below:

17/12/21 05:18:40 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(8,0,ShuffleMapTask,TaskKilled(stage cancelled),org.apache.spark.scheduler.TaskInfo@78db3052,null)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 8.0 failed 1 times, most recent failure: Lost task 17.0 in stage 8.0 (TID 323, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
  ... 53 elided
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

scala> 17/12/21 05:18:40 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
java.io.IOException: Failed to delete: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
Arun
  • 651
  • 2
  • 7
  • 21
  • Looks like, you are running your spark job in "local" mode. And all the execution is happening on the Driver jvm. When you are calling the "sourceDataFrame.except(targetRawData).count", there will be computation involved for subtracting the data from sourcedataframe and targetrawdata and getting the count.This computation will happen on same driver jvm, which cannot handle all the data in one jvm. Run it in "yarn-cluster' mode, it should work. – Amit Kumar Dec 21 '17 at 06:08
  • Or try increasing the memory of driver by setting parameter "--driver-memory". By default it allocates 2gb of memory. Increase it and run it. – Amit Kumar Dec 21 '17 at 06:13
  • Thanks Amit for your response. I am running the code in single node cluster and running the code in REPL spark-shell. How do I change it from `local` to `yarn-cluster` mode and is this feasible in single node cluster? – Arun Dec 21 '17 at 06:14
  • try using `./bin/spark-shell --driver-memory 5g` to increase the driver memory in case of spark shell – Amit Kumar Dec 21 '17 at 06:16
  • Thanks Amit. I will try this. Whilst I try this, do you know the `except() ` function uses the `driver` memory or the `executor` memory to compute. As I am running on a single node cluster, I have total 8 GB RAM on it .. about 2-3 GB RAM memory is used by hadoop node cluster, so I was wondering whether to assign more memory to `driver` or `executor` – Arun Dec 21 '17 at 06:19
  • The spark transformation execution depends upon the deploy mode you are using for spark. If you are using `local` mode, everything will run on driver. If you use `yarn-client` or `yarn-cluster` mode, then only spark driver will spawn executors for computation of tasks. – Amit Kumar Dec 21 '17 at 06:24
  • use like this to start your spark-shell in yarn mode. `spark-shell --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g` – Amit Kumar Dec 21 '17 at 06:30

1 Answers1

0

Your spark process is wasting too much time in Garbage collection.Most of the cpu core is getting consumed and processing doesnt completes.You are running out of executor memory.You can try below options

  • Tune the property spark.storage.memoryFraction and spark.memory.storageFraction.You can also issue the command to tune this-spark-submit ... --executor-memory 4096m --num-executors 20..
  • Or by changing the GC policy.Check the current GC value.Set the value to -XX:G1GC
Subash
  • 887
  • 1
  • 8
  • 19
  • I'm facing similar situation even though `Spark` UI shows no time spent on *Garbage Collection*. [**GC Time = 0 ms**] – y2k-shubham Apr 03 '18 at 11:07
  • Could you post the whole stack trace pls as your information is insufficient to track the issue – Subash Aug 14 '18 at 13:21
  • I later found out that the *timeout* was related to nested queries being triggered to `MySQL` from `spark.read.jdbc(..)` method. It was something along the lines of [this](http://dangoldin.com/2017/11/07/spark-s-read-jdbc/) but not precisely the same thing. I'll come up with a *comprehensive description* over a blog post / or just answer [my original question](https://stackoverflow.com/questions/49149996); describing the issue completely with reproduction steps and resolution. Will inform you about it at the time. – y2k-shubham Aug 16 '18 at 10:24