0

I'm getting the following error in my Spark job (Spark version 2.4.0):

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1346 tasks (1024.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2065)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2086)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2105)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2130)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:237)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$createInMemoryFileIndex(DataSource.scala:533)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:371)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:644)
    at common.infra.StreamProviderImpl.getData(StreamProvider.scala:44)
    at common.MachineProfileReader$.readMachineInfo(MachineProfileReader.scala:20)
    at common.MachineProfileReader$.readLatestMachineInfoOfUniqueMachines(MachineProfileReader.scala:61)
    at common.MachineProfileReader$.enrichWithMachineInfo(MachineProfileReader.scala:89)
    at client.telemetry.CPUPerformanceKPIJob$$anonfun$main$1.apply$mcV$sp(CPUPerformanceKPIJob.scala:193)
    at client.telemetry.CPUPerformanceKPIJob$$anonfun$main$1.apply(CPUPerformanceKPIJob.scala:26)
    at client.telemetry.CPUPerformanceKPIJob$$anonfun$main$1.apply(CPUPerformanceKPIJob.scala:26)
    at client.common.entities.SparkJobExecutionStatus$.run(SparkJobExecutionStatus.scala:22)
    at client.telemetry.CPUPerformanceKPIJob$.main(CPUPerformanceKPIJob.scala:26)
    at client.telemetry.CPUPerformanceKPIJob.main(CPUPerformanceKPIJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)

The root cause, as I understand it from the stack trace, is at common.infra.StreamProviderImpl.getData(StreamProvider.scala:44):

val telemetry = spark.read.parquet(unifiedPathsFromAllStorages: _*)

For some reason, as the stack trace shows, this line causes a call to org.apache.spark.rdd.RDD.collect down the line. This, in turn, collects all the data into the driver, causing it to exceed the driver.maxResultSize.

Any idea why that happens? I haven't called collect() myself. Would appreciate a solution, other than simply increasing maxResultSize, which may lead to out of memory errors.

Koedlt
  • 4,286
  • 8
  • 15
  • 33
Royar
  • 611
  • 6
  • 21
  • It is not loading the data, looks like it is just [trying to](https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L207C22-L207C37) collect the info for all the files you are trying to read. Could it be that you just have way too many paths? Try to coalesce them into larger partitions perhaps? – Dima Jul 11 '23 at 12:11
  • Does this answer your question? [What is spark.driver.maxResultSize?](https://stackoverflow.com/questions/39087859/what-is-spark-driver-maxresultsize) – mazaneicha Jul 11 '23 at 13:27

1 Answers1

2

If you have a closer look at your stack trace, you see that the collect operation is invocated in org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:237).

So let's have a quick look at the relevant lines in InMemoryFileIndex.scala in the source code for version 2.4.0 (I only pasted a part of it, for the sake of keeping this as short as possible):

  /**
   * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
   * on the number of paths to list.
   *
   * This may only be called on the driver.
   *
   * @return for each input path, the set of discovered files for the path
   */
  private[sql] def bulkListLeafFiles(
      paths: Seq[Path],
      hadoopConf: Configuration,
      filter: PathFilter,
      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {

    // Short-circuits parallel listing when serial listing is likely to be faster.
    if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
      return paths.map { path =>
        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
      }
    }

    ...
      sparkContext
        .parallelize(serializedPaths, numParallelism)
        .mapPartitions { pathStrings =>
          val hadoopConf = serializableConfiguration.value
          pathStrings.map(new Path(_)).toSeq.map { path =>
            (path, listLeafFiles(path, hadoopConf, filter, None))
          }.iterator
        }.map { case (path, statuses) =>
          ...
          }
          ...
      }.collect()
   ...

So we see that indeed, a collect operation is being called! But it might not be for the purpose you expect: it is not collecting your actual data onto the driver, but it is collecting an RDD that contains all of the paths that are to be read.

Now what?

It seems like your parquet file has A LOT of subfiles, and doing collect on an RDD that contains these paths seems to be too big. Before trying any of the below possible solutions, I would try to verify the amount of subfiles within your parquet file.

There are multiple things you could try to do to get over this issue:

  1. The simplest is increasing spark.driver.maxResultSize. The default value for this parameter is 1024MB. After having had a look at the amount of subfiles, you might have an idea of how large this value needs to be. Or you could brute force this, and just multiply it by 2 until it works (provided your infrastructure permits this)

  2. One of the bits of code I kept in the above snippet is the underlying snippet.

    // Short-circuits parallel listing when serial listing is likely to be faster.
    if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
      return paths.map { path =>
        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
      }
    }

As you can see, there is a mechanism at work that short circuits to creation of an RDD to list the files in a parallel fashion. If the amount of paths to be read is smaller than parallelPartitionDiscoveryThreshold, no RDD will be created and no collect operation will take place. The file listing will happen on the driver side.

The documentation on this configuration parameter can be found here:

  val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
    buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
      .doc("The maximum number of paths allowed for listing files at driver side. If the number " +
        "of detected paths exceeds this value during partition discovery, it tries to list the " +
        "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
        "LibSVM data sources.")
      .intConf
      .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
        "files at driver side must not be negative")
      .createWithDefault(32)

So the second option would be to make spark.sql.sources.parallelPartitionDiscovery.threshold VERY large (larger than your amount of paths to be read, which is probably large in this case), removing the creation of this RDD and thus this collect operation. This might make your reading operation slower, as you won't benefit from the parallel listing capacity that is provided.

I would do the following in chronological order:

  • Verify whether it's expected that your parquet file has this big amount of subfiles
  • If the previous makes sense, try increasing spark.driver.maxResultSize until it works (option 1 above)
  • If the previous does not work, try option 2 above
Koedlt
  • 4,286
  • 8
  • 15
  • 33