4

I use Hortonworks 2.6 with 5 nodes. I spark-submit to YARN (with 16GB RAM and 4 cores).

I have a RDD transformation that runs fine in local but not with yarn master URL.

rdd1 has values like:

id  name    date
1   john    10/05/2001 (dd/mm/yyyy)
2   steve   11/06/2015

I'd like to change the date format from dd/mm/yyyy to mm/dd/yy, so I wrote a method transformations.transform that I use in RDD.map function as follows:

rdd2 = rdd1.map { rec => (rec.split(",")(0), transformations.transform(rec)) }

transformations.transform method is as follows:

object transformations {
  def transform(t: String): String = {
    val msg = s">>> transformations.transform($t)"
    println(msg)     
    msg
  }
}

Actually the above code works fine in local but not in cluster. The method just returns an output as if the map looked as follows:

rdd2 = rdd1.map { rec => (rec.split(",")(0), rec) } 

rec does not seem to be passed to transformations.transform method.

I do use an action to trigger transformations.transform() method but no luck.

val rdd3 = rdd2.count()
println(rdd3)

println prints the count but does not call transformations.transform method. Why?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Pyd
  • 6,017
  • 18
  • 52
  • 109

3 Answers3

2

tl;dr Enable Log Aggregation in Hadoop and use yarn logs -applicationId to see the logs (with println in the logs of the two default Spark executors). Don't forget to bounce the YARN cluster using sbin/stop-yarn.sh followed by sbin/start-yarn.sh (or simply sbin/stop-all.sh and sbin/start-all.sh).


The reason why you don't see the println's output in the logs in YARN is that when a Spark application is spark-submit'ed to a YARN cluster, there are three YARN containers launched, i.e. one container for the ApplicationMaster and two containers for Spark executors.

RDD.map is a transformation that always runs on an Spark executor (as a set of tasks one per RDD partition). That means that println goes to the logs of executors.

NOTE: In local mode, a single JVM runs both the driver and the single executor (as a thread).

To my surprise, you won't be able to find the output of println in the ResourceManager web UI at http://localhost:8088/cluster for the Spark application either.

RM's web UI

What worked for me was to enable log aggregation using yarn.log-aggregation-enable YARN property (that you can read about in the article Enable Log Aggregation):

// etc/hadoop/yarn-site.xml
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<property>
  <name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
  <value>3600</value>
</property>

With that configuration change, you simply spark-submit --master yarn to submit a Spark application followed by yarn logs -applicationId (I used yarn logs -applicationId application_ID > output.txt and reviewed output.txt).

You should find >>> transformations.transform(1,john,10/05/2001) there.

The Code

The code I used was as follows:

import org.apache.spark.SparkContext

object HelloRdd extends App {

  object transformations {
    def transform(t: String): String = {
      val msg = s">>> transformations.transform($t)"
      println(msg)
      msg
    }
  }

  val sc = SparkContext.getOrCreate()
  val rdd1 = sc.textFile(args(0))
  val rdd2 = rdd1.map { rec => (rec.split(",")(0), transformations.transform(rec)) }
  rdd2.count()
}

The following is the spark-submit I used for testing.

$ HADOOP_CONF_DIR=/tmp ~/dev/apps/spark/bin/spark-submit \
  --master yarn \
  target/scala-2.11/spark-project_2.11-0.1.jar `pwd`/hello.txt
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Its Printing now..! Thank you – Pyd Apr 05 '18 at 05:38
  • It did print, but the way **we** looked at this problem was wrong (!) I hope we all learnt something new today. Thanks. – Jacek Laskowski Apr 05 '18 at 06:50
  • I have a global variable in DVT `object` called `element` its carrying empty value i.e `List()` in cluster is ter any fix. `trait variables { val element: List[List[String]] = List() }` In transformations page i used like DVT.element but its returning empty value in cluseter – Pyd Apr 05 '18 at 09:22
  • Please ask another question. – Jacek Laskowski Apr 05 '18 at 09:55
1

You really don't provide enough information, and

Yes, I did in local its working fine its executing the if loop but in cluster else is executed

is contradictory to

the method inside the map is not accessible while running in cluster

If it's executing the else branch, it doesn't have any reason to call the method in the if branch, so it doesn't matter whether it's accessible.

And if the problem was that the method is inaccessible, you'd see exceptions being thrown, e.g. ClassNotFoundException or AbstractMethodError; Scala wouldn't just decide to ignore the method call instead.

But given your code style I am going to guess that transformation is a var. Then it's likely that code which sets it isn't executed on the driver (where the if is executed). In local mode it doesn't matter, but in cluster mode it just sets the copy of transformation on the node it's executed on.

This is the same issue described at https://spark.apache.org/docs/latest/rdd-programming-guide.html#local-vs-cluster-modes:

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode.

Alexey Romanov
  • 167,066
  • 35
  • 309
  • 487
  • Actually the `transformations.transform` method is called inside the if loop and its also coming inside if loop but instead of entering in to the transformation method it just executed as `.map(rec => (rec.split(",")(0), rec))` instead of `.map(rec => (rec.split(",")(0), transformations.transform(rec)))` – Pyd Mar 27 '18 at 07:31
  • So is there any alternative to call `transformations.transform` inside map – Pyd Mar 27 '18 at 07:44
  • Another possibility is that `transformations.transform` _used_ to be defined differently and you have a JAR with the old version on the node for some reason. – Alexey Romanov Mar 27 '18 at 07:49
  • `println("from transformations")` its not printed inside transformations method..! – Pyd Mar 27 '18 at 09:27
  • If that's why you think it isn't getting executed, this is wrong, see https://stackoverflow.com/questions/33225994/spark-losing-println-on-stdout#comment62643591_33226090. – Alexey Romanov Mar 27 '18 at 09:38
  • Do we have any alternative way to call that method in map or using any method. – Pyd Mar 27 '18 at 10:05
0

Why is the code inside RDD.map not executed with count?
I want to change the date format from (dd/mm/yyyy) to (mm/dd/yy), so using a method called transform inside transformations(object) in map() function

If you are looking to change the dateformat only, then I would suggest you not to go through such complexities as its very difficult to analyze the cause of the issue. I would suggest you to apply dataframes instead of rdds as there are many inbuilt functions to meet your needs. For your specific requirement to_date and date_format inbuilt functions should do the trick

First of all, read the data to dataframe as

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", true)
  .load("path to the data file")

Then just apply the to_date and date_format functions as

import org.apache.spark.sql.functions._
df.withColumn("date2", date_format(to_date(col("date"), "dd/MM/yyyy"), "MM/dd/yy")).show(false)

and you should get

+---+-----+----------+--------+
|id |name |date      |date2   |
+---+-----+----------+--------+
|1  |john |10/05/2001|05/10/01|
|2  |steve|11/06/2015|06/11/15|
+---+-----+----------+--------+

Simple isn't it?

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97