In version 3.0.0, your error is triggered by this piece of code:
/**
* Check whether has enough quota to fetch the result with `size` bytes
*/
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
}
As you can see, if maxResultSize == 0
you will never get the error you're getting. A bit higher, you see that maxResultSize
comes from config.MAX_RESULT_SIZE
. And that one is finally defined by spark.driver.maxResultSize
in this piece of code:
private[spark] val MAX_RESULT_SIZE = ConfigBuilder("spark.driver.maxResultSize")
.doc("Size limit for results.")
.version("1.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1g")
Conclusion
You are trying the correct thing! Having spark.driver.maxResultSize
equal to 0 also works in Spark 3.0. As you see in your error message, it seems that your config.MAX_RESULT_SIZE
is still equal to the default value of 1024MB
.
This means that your configuration is probably not coming through somehow. I would investigate your whole setup. How are you submitting your application? What is your master? Is your spark.sql.execution.arrow.pyspark.enabled
config coming through?