0

I am trying to convert a spark dataframe into a pandas dataframe. I have a sufficiently large driver. I am trying to set the spark.driver.maxResultSize value , like this

spark = (
        SparkSession
        .builder
        .appName('test')
        .enableHiveSupport()
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .config("spark.driver.maxResultSize","0")
        .getOrCreate()
    )

But the job is failing with the following error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of XXXX tasks (1026.4 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
Koedlt
  • 4,286
  • 8
  • 15
  • 33
Ayan Biswas
  • 1,641
  • 9
  • 39
  • 66

1 Answers1

0

In version 3.0.0, your error is triggered by this piece of code:

  /**
   * Check whether has enough quota to fetch the result with `size` bytes
   */
  def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
    totalResultSize += size
    calculatedTasks += 1
    if (maxResultSize > 0 && totalResultSize > maxResultSize) {
      val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
        s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " +
        s"(${Utils.bytesToString(maxResultSize)})"
      logError(msg)
      abort(msg)
      false
    } else {
      true
    }
  }

As you can see, if maxResultSize == 0 you will never get the error you're getting. A bit higher, you see that maxResultSize comes from config.MAX_RESULT_SIZE. And that one is finally defined by spark.driver.maxResultSize in this piece of code:

  private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
    .doc("Size limit for results.")
    .version("1.2.0")
    .bytesConf(ByteUnit.BYTE)
    .createWithDefaultString("1g")

Conclusion

You are trying the correct thing! Having spark.driver.maxResultSize equal to 0 also works in Spark 3.0. As you see in your error message, it seems that your config.MAX_RESULT_SIZE is still equal to the default value of 1024MB.

This means that your configuration is probably not coming through somehow. I would investigate your whole setup. How are you submitting your application? What is your master? Is your spark.sql.execution.arrow.pyspark.enabled config coming through?

Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • I am running it from a jupyter notebook that submits the job in a kubernetes cluster – Ayan Biswas Mar 11 '23 at 13:35
  • 1
    Where are you defining your kubernetes configurations? Like `spark.kubernetes.container.image` or `spark.kubernetes.namespace`? Also, where are you defining your master? I feel like you'll need to add the `spark.driver.maxResultSize` configuration next to those configs (but it's hard to say without knowing your setup properly). – Koedlt Mar 11 '23 at 20:52