0

I have a three node Spark Cluster using Yarn as cluster manager [running on three node hadoop cluster].

Consider that, My Hadoop Cluster is having three nodes [Master, Slave1 and Slave2] Resourcemanager is running on Master and NodaManager on Slave1 & Slave2. Spark Cluster is also present on three nodes.

At Master node, I have created a folder /data/nfsshare which I have mounted on Slave1 and Slave2 as /nfsshare. Now I have kept one file abc.txt in /data/nfsshare folder which is visible to both slave1 and slave2 at /nfsshare location.

I have created a small spark job for copying abc.txt from /data/nfsshare location to HDFS and also perform word count and save its result as well in HDFS.

def write2HDFS(args:Array[String]){

val  source = args(0)
val  destination = args(1)   
val processedDataDestination = args(2)
val conf = new SparkConf().setAppName("WCRemoteReadHDFSWrite").set("spark.hadoop.validateOutputSpecs", "true");
val sc = new SparkContext(conf)

logger.info("STARTING READ")

val rdd = sc.textFile(source)

logger.info("READING DONE")
logger.info("STARTING WRITE")
logger.info("rdd.toDebugString >>>>>> "+rdd.toDebugString)
logger.info("rdd.getNumPartitions >>>>>>>>" +rdd.getNumPartitions)
// rdd.coalesce(1)
// logger.info("rdd.getNumPartitions after coalesce(1) >>>>>>>>" +rdd.getNumPartitions)

rdd.saveAsTextFile(destination)


logger.info("DONE")
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile(processedDataDestination) 

sc.stop}

When I am trying execute this code using command:

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar file:///data/nfsshare/abc.txt hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWrite1MB hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWriteP1MB

I am getting following intermittent issues:

1) InputPath Doesnt exist: file:/data/nfsshare/abc.txt, was coming intermittently during some runs of this job [whereas file was present at shared location/mounted path]

2) Sometimes/intermittently job status was coming as failed but output directories were getting created with the required data

3) Output Directory already exists: Sometimes HDFS output directory existing issue was coming --> This got resolved by increasing executor and driver memory

-->I tried running this job in both clustered and client deployment modes but I am getting same issue in both the cases.

I am not sure if shared location path as /data/nfsshare at Master and /nfsshare at slaves is making any difference? because at command line I am passing /data/nfsshare as file path location and hence whenever any executor ran on slaves would have looked for /data/nfsshare would have failed.

I tried running this job on all three nodes but these intermittent issues still persists.

Any expert advice would be appreciated.

If at all there is any other much better way to putting file from any staging area/mounted location to HDFS is possible then please share that as well.

Regards, Bhupesh

Chauhan B
  • 461
  • 8
  • 27

3 Answers3

1

Q1, Q2) You mentioned you nfs mounted the directory from /data/nfsshare on local to /nfsshare on HDFS. If you've successfully done this and you've verified that it is working, why don't you use that as your input path?

Things get kind of tricky in YARN mode when you try to use local file system. If you're using distributed computing, its best to keep your inputs should be in HDFS. So, your spark-submit command becomes,

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar /nfsshare/path /user/hduser/hdfsWrite1MB /user/hduser/hdfsWriteP1MB

Note that I omitted hdfs://, this is because default file system in Spark environment is HDFS.

Q3) Output Directory already exists: You could do this before saving the file as explained here,

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://host:port/"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(/path/to/output), true) } 
catch { case _ : Throwable => { } }

or you can simply append current time stamp to your output path, if you don't want to delete things again and again. This is only if you're dealing with RDDs, Dataframe API has ability overwrite on existing paths.

PS: your Q1 show input path as file:/data/nfsshare/test-1MB whereas input in spark-submit command shows file:///data/nfsshare/abc.txt. Is abc.txt a directory?

Let me know if this helps. Cheers.

Chitral Verma
  • 2,695
  • 1
  • 17
  • 29
  • Thanks for your valuable inputs Chitral Verma. I think I was not able to explain NFS mounting part properly. Actually the thing was. I was having three nodes Master, Slave1 and Slave2. I have created one folder /data/nfsshare on Master which was mounted with Slave1 and Slave2 and was visible there as /nfsshare not as /data/nfsshare. – Chauhan B Jun 21 '17 at 09:56
0
  1. it's better to upload your input file to hdfs without spark, just upload it to hdfs with hdfs dfs -copyFromLocal or you just may try to upload it with hdfs client library, but single threaded without spark's api. Usually assumption is that the input data is already in distributed file system(s3, hdfs, whatever). You might see all kind of effects when using nfs. So from design perspective there is some part of pipeline that puts data into s3/hdfs, and only then spark's parallel processing kicks in.
  2. Yes, you should clean your output directory if you are running same job over and over again, I think there is spark configuration that will permit you to disable this validation, however better design your application to write into new path every time
Igor Berman
  • 1,522
  • 10
  • 16
0

Actually I was facing InputPath Doesnt exist: file:/data/nfsshare/abc.txt intermittently because of the mounted folder names. Once I kept same name on all nodes [/data/nfsshare]. this issue went off.

I am assuming that when I was running my spark job in Cluster mode, YARN was deciding where to run driver and executors, hence if at all executors were running on Master node [from where /data/nfsshare] was visible, job was working fine whereas for other executors where this path coming as /nfsshare, this throwing path related issue. Once path issue was resolved all executors were able to see file path as /data/nfsshare

Also for output directory already exist, Chitral Verma's code snippet helped.

Chauhan B
  • 461
  • 8
  • 27