11

enter image description here

  1. Can anyone elaborate to me what exactly Input, Output, Shuffle Read, and Shuffle Write specify in spark UI?
  2. Also, Can someone explain how is input in this job is 25~30% of shuffle write? As per my understanding, shuffle write is sum of temporary data that cannot be hold in memory and data that needs to sent to other executors during aggregation or reducing.

Code Below :

hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
            =>((row.getString(0), row.getString(12)),
                (row.getTimestamp(11), row.getTimestamp(11),
                    row))}
    .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)}
   .reduceByKey{
       case(x, y)=>
            if(x._1.before(y._1)){
                if(x._2.after(y._2))
                    (x)
                else
                    (x._1, y._2, y._3)
            }else{
                if(x._2.after(y._2))
                    (y._1, x._2, x._3)
                else
                    (y)
            }
   }.count()

Where ReadDailyFileDataObject is a case Class which holds the row fields as a container. Container is required as there are 30 columns, which exceeds tuple limit of 22.

Updated Code, removed case class, as I see same issue, when i use Row itself instead of case Class.

Now currently i see

Task : 10/7772

Input : 2.1 GB

Shuffle Write : 14.6 GB

If it helps, i am trying to process table stored as parquet file, containing 21 billion rows.

Below are the parameters i am using,

"spark.yarn.am.memory" -> "10G"
"spark.yarn.am.cores"  -> "5"
"spark.driver.cores"   -> "5"
"spark.executor.cores" -> "10"
"spark.dynamicAllocation.enabled" -> "true"
"spark.yarn.containerLauncherMaxThreads" -> "120"
"spark.executor.memory" -> "30g"
"spark.driver.memory" -> "10g"
"spark.driver.maxResultSize" -> "9g"
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"
"spark.kryoserializer.buffer" -> "10m"
"spark.kryoserializer.buffer.max" -> "2001m"
"spark.akka.frameSize" -> "2020"

SparkContext is registered as

new SparkContext("yarn-client", SPARK_SCALA_APP_NAME, sparkConf)

On Yarn, i see

Allocated CPU VCores : 95

Allocated Memory : 309 GB

Running Containers : 10

Abhishek Anand
  • 1,940
  • 14
  • 27

2 Answers2

1

It's actually hard to provide an answer without the code, but it is possible that you are going through your data multiple times, so the total volume you are processing is actually "X" times your original data.

Can you post the code you are running?

EDIT

Looking at the code, I have had this kind of issue before, and it was due to the serialization of the Row, so this might be your case as well.

What is "ReadDailyFileDataObject"? Is it a class, a case class?

I would first try running your code like this:

hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
            =>((row.get(0).asInstanceOf[String], row.get(12).asInstanceOf[String]),
                (row.get(11).asInstanceOf[Timestamp], row.get(11).asInstanceOf[Timestamp]))}
    .filter{case((client, hash),(d1,d2)) => (d1 !=null && d2 !=null)}
   .reduceByKey{
       case(x, y)=>
            if(x._1.before(y._1)){
                if(x._2.after(y._2))
                    (x)
                else
                    (x._1, y._2)
            }else{
                if(x._2.after(y._2))
                    (y._1, x._2)
                else
                    (y)
            }
   }.count()

If that gets rids of your shuffling problem, then you can refactor it a little: - Make it a case class, if it isn't already. - Create it like "ReadDailyFileDataObject(row.getInt(0), row.getString(1), etc..)"

Hope this counts as an answer, and helps you find your bottleneck.

alghimo
  • 2,899
  • 18
  • 11
  • Makes no sense with the code I am using. All i did was a mapping, which at the end have same number of elements as of original, just with a container case class. This definately cannot result in 4x time of shuffle write. – Abhishek Anand Mar 29 '16 at 16:08
  • This does not provide an answer to the question. To critique or request clarification from an author, leave a comment below their post. - [From Review](/review/low-quality-posts/11814978) – BJ Myers Mar 29 '16 at 16:22
  • @BJMyers You're totally right, sorry. I'll take it into account for the next time. – alghimo Mar 29 '16 at 16:28
  • @alghimo , ReadDailyFileDataObject is already a case Class, i mentioned that at the end of my post. Anyways, will have to wait for script to finish execution in couple of hours. Then will give your version a shot. But if i do that, anyways there won't be any shuffling problem, because data volume itself will be very less. In your mapping you are removing 26 column and keeping only 4. Shuffling only happens, if it has to pass to other executor or, it is not able to fit in memory. – Abhishek Anand Mar 29 '16 at 16:49
  • @AbhishekAnand I'm pretty sure the issue is with the Row. Instead of not using ReadDailyFileDataObject(row) at all, maybe you can just replace it by "row.toSeq" instead. If that solves the issue, you can work from there to get a better structure. – alghimo Mar 29 '16 at 18:07
  • @alghimo , you were right. The issue was in case Class. Basic problem was everywhere i was using asInstanceOf , which was creating new object for each attribute of object class. Then i registered my case class to Kryo, which relaxed it further. – Abhishek Anand Mar 30 '16 at 02:31
  • I take it back. After processing 20 partitions, it is going back to do same non-sense. Input 1300 mb, shuffle write 6.1 GB – Abhishek Anand Mar 30 '16 at 02:48
1

The shown tips when you hover your mouse over Input Output Shuffle Read Shuffle Write explain themselves quite well:

INPUT: Bytes and records read from Hadoop or from Spark storage.

OUTPUT: Bytes and records written to Hadoop.

SHUFFLE_WRITE: Bytes and records written to disk in order to be read by a shuffle in a future stage.

Shuffle_READ: Total shuffle bytes and records read (includes both data read locally and data read from remote executors).

In your situation, 150.1GB account for all the 1409 finished task's input size (i.e, the total size read from HDFS so far), and 874GB account for all the 1409 finished task's write on node's local disk.

You can refer to What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming? to understand the overall shuffle functionality well.

Community
  • 1
  • 1
yjshen
  • 6,583
  • 3
  • 31
  • 40
  • 2
    This also doesn't make any sense in this case. I am doing a filtering operation here, and just one mapping operation before that. Mapping operation puts the elements of row into a case class wrapper. This cannot result in data becoming 4x of original size. – Abhishek Anand Mar 29 '16 at 16:10