- Can anyone elaborate to me what exactly Input, Output, Shuffle Read, and Shuffle Write specify in spark UI?
- Also, Can someone explain how is input in this job is 25~30% of shuffle write? As per my understanding, shuffle write is sum of temporary data that cannot be hold in memory and data that needs to sent to other executors during aggregation or reducing.
Code Below :
hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'") .rdd .map{case (row:Row) =>((row.getString(0), row.getString(12)), (row.getTimestamp(11), row.getTimestamp(11), row))} .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)} .reduceByKey{ case(x, y)=> if(x._1.before(y._1)){ if(x._2.after(y._2)) (x) else (x._1, y._2, y._3) }else{ if(x._2.after(y._2)) (y._1, x._2, x._3) else (y) } }.count()
Where ReadDailyFileDataObject is a case Class which holds the row fields as a container. Container is required as there are 30 columns, which exceeds tuple limit of 22.
Updated Code, removed case class, as I see same issue, when i use Row itself instead of case Class.
Now currently i see
Task : 10/7772
Input : 2.1 GB
Shuffle Write : 14.6 GB
If it helps, i am trying to process table stored as parquet file, containing 21 billion rows.
Below are the parameters i am using,
"spark.yarn.am.memory" -> "10G"
"spark.yarn.am.cores" -> "5"
"spark.driver.cores" -> "5"
"spark.executor.cores" -> "10"
"spark.dynamicAllocation.enabled" -> "true"
"spark.yarn.containerLauncherMaxThreads" -> "120"
"spark.executor.memory" -> "30g"
"spark.driver.memory" -> "10g"
"spark.driver.maxResultSize" -> "9g"
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer" -> "10m"
"spark.kryoserializer.buffer.max" -> "2001m"
"spark.akka.frameSize" -> "2020"
SparkContext is registered as
new SparkContext("yarn-client", SPARK_SCALA_APP_NAME, sparkConf)
On Yarn, i see
Allocated CPU VCores : 95
Allocated Memory : 309 GB
Running Containers : 10