I am running the below code on Azure Databricks DBR 7.3 LTS, spark 3.0.1, scala 2.12 On a cluster of (20 to 35) workers of Standard_E4as_v4 (32.0 GB Memory, 4 Cores, 1 DBU) VMs And Driver of type Standard_DS5_v2 (56.0 GB Memory, 16 Cores, 3 DBU)
The aim is to process ~5.5 TB of data
I am facing the following exception: "org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1165 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB" After processing 1163 out of 57071, with 148.4 GiB being processed of the data, in 6.1 min
I don't do a collect or transfer data to the driver, does partitioning data causes this issue? If this is the case:
- Is there a better way to partition?
- How to solve this issue?
Code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)
val jsonDF = spark.read.json("/mnt/myfile")
val res = jsonDF
.withColumn("row", row_number.over(w))
.where($"row" === 1)
.drop("row")
res.write.json("/mnt/myfile/spark_output")
I then have tried only to load and write data again with no transformation, and faced same issue, code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val jsonDF = spark.read.json("/mnt/myfile")
jsonDF.write.json("/mnt/myfile/spark_output")