0

I am running the below code on Azure Databricks DBR 7.3 LTS, spark 3.0.1, scala 2.12 On a cluster of (20 to 35) workers of Standard_E4as_v4 (32.0 GB Memory, 4 Cores, 1 DBU) VMs And Driver of type Standard_DS5_v2 (56.0 GB Memory, 16 Cores, 3 DBU)

The aim is to process ~5.5 TB of data

I am facing the following exception: "org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1165 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB" After processing 1163 out of 57071, with 148.4 GiB being processed of the data, in 6.1 min

I don't do a collect or transfer data to the driver, does partitioning data causes this issue? If this is the case:

  • Is there a better way to partition?
  • How to solve this issue?

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)

val jsonDF = spark.read.json("/mnt/myfile")

val res = jsonDF
      .withColumn("row", row_number.over(w))
      .where($"row" === 1)
      .drop("row")

res.write.json("/mnt/myfile/spark_output")

I then have tried only to load and write data again with no transformation, and faced same issue, code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val jsonDF = spark.read.json("/mnt/myfile")

jsonDF.write.json("/mnt/myfile/spark_output")
mohsen
  • 93
  • 3
  • 14

1 Answers1

5

the write method sends the result of the writing operation for all partitions back to the Driver and due to the large volume of data (and many partitions) this exception occurred, try to increase the spark.driver.maxResultSize to see it works.

from documentation:

Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.

this is useful also.

badger
  • 2,908
  • 1
  • 13
  • 32
  • I though the write operation is void method (doesn't have a return), 1) what data does it return to the driver? 2) Aren't workers responsible execute write based on partitions each worker has? 3) Do I need the capacity of the driver node to be as big as the total data written at the end? – mohsen Nov 03 '20 at 13:13
  • in general I'm talking about writing process in spark, which is a heavy process and a lot of things happenes under the hood, workers do the writing operation but they report the result of the writing operation to the driver (success/fail) and this is necessary for covering guarantees Spark provided to the programmer(think what happens if one partition fails during writing) 3 -> no just incrementally increase this memory start with 5G and go up https://kb.databricks.com/jobs/job-fails-maxresultsize-exception.html – badger Nov 03 '20 at 13:51