4

I am trying to read a file submitted by spark-submit to yarn cluster in client mode. Putting file in HDFS is not an option. Here's what I've done:

def main(args: Array[String]) {
   if (args != null && args.length > 0) {
        val inputfile: String = args(0)

        //get filename: train.csv
        val input_filename = inputfile.split("/").toList.last 

        val d = SparkSession.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(SparkFiles.get(input_filename))
        d.show() 
   }   
}

and submitted to yarn this way:

spark2-submit \
--class "com.example.HelloWorld" \
--master yarn --deploy-mode client \
--files repo/data/train.csv \
--driver-cores 2 helloworld-assembly-0.1.jar repo/data/train.csv

but I got an exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://xxxxx.xxxxx.xxxx.com:8020/tmp/spark-db3ee991-7f3d-427c-8479-aa212f906dc5/userFiles-040293ee-0d1f-44dd-ad22-ef6fe729bd49/train.csv; 

and I've also tried:

val input_filename_1 = """file://""" + SparkFiles.get(input_filename)
println(input_filename_1)

SparkSession.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(input_filename_1) 

and still got a similar error:

 file:///tmp/spark-fbd46e9d-c450-4f86-8b23-531e239d7b98/userFiles-8d129eb3-7edc-479d-aeda-2da98432fc50/train.csv
 Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/spark-fbd46e9d-c450-4f86-8b23-531e239d7b98/userFiles-8d129eb3-7edc-479d-aeda-2da98432fc50/train.csv;
SCouto
  • 7,808
  • 5
  • 32
  • 49
DougKruger
  • 4,424
  • 14
  • 41
  • 62
  • @user10465355 not a duplicate. Accepted answer involved putting data in HDFS. Thats not an option in my case, hence submitting the files `--files` – DougKruger Nov 30 '18 at 12:14
  • Have you tried to use an absolute path (something like `/my-absolute-path/repo/data/train.csv`) in the `--files` option of the call to `spark-submit`? – Romain Nov 30 '18 at 12:19
  • @vindev: How's it a duplicate? I'm using the same `SparkFiles.get()` as suggested in the answer and having this error. – DougKruger Nov 30 '18 at 12:24
  • @Romain: I could see yarn pick it up using the absolute path: `18/11/30 11:54:35 INFO yarn.Client: Uploading resource file:/vol1/user/adam/my_projects/repo/data/train.csv -> hdfs://xxxxxxxx:8020/user/adam/.sparkStaging/application_1541792367360_115525/train.csv` – DougKruger Nov 30 '18 at 12:28
  • @DougKruger Weird the second attempt should work with the full path `file:///tmp/spark-fbd46e9d-c450-4f86-8b23-531e239d7b98/userFiles-8d129eb3-7edc-479d-aeda-2da98432fc50/train.csv` including the leading `file://`. It's clear in the first attempt that it tries to read the file from HDFS and it's not the expected behavior. – Romain Nov 30 '18 at 12:55
  • @Romain: exactly my point! – DougKruger Nov 30 '18 at 12:56
  • @DougKruger I've nothing to test right now, so I'm sorry to be unable to help further. Maybe have a look to the way you get the `SparkSession` (`SparkSession.builder().getOrCreate()`). – Romain Nov 30 '18 at 13:03

1 Answers1

2

I tried the same scenario with --files test.csv and with spark.sparkContext.addFile("test.csv")

spark.sparkContext.addFile("test.csv")
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("file://"+SparkFiles.get("test.csv"))

File you get with scala> SparkFiles.get("test.csv")

Ex : /tmp/spark-9c4ea9a6-95d7-44ff-8cfb-1d9ce9f30638/userFiles-f8909daa-9710-4416-b0f0-9d9043db5d8c/test.csv is created on local file system where you submit the job.

So workers do not have this file to read. Problem might be with using spark.read.csv

Edit:

I tried copying locally created file to other nodes. It worked.

Hope this would help.

  • I think so too, workers do not have this file to read. Apart from spark.read.csv, any other suggestions as to why workers are not getting this file? – DougKruger Nov 30 '18 at 14:05
  • so there's really no way for spark to ship the files to all nodes without manually coping the file to all nodes or saving to HDFS? – DougKruger Nov 30 '18 at 14:10
  • I couldn't find any way. This addFile method add the file to spark, but driver and workers keep their files in different places according to their configurations. I tried accessing it withing a map function as follow, `df.map{x=> (firstLine(new File(SparkFiles.get("test.csv"))))}.show` It worked also. firstLine function was found here [link](https://stackoverflow.com/questions/8865434/how-to-get-first-line-from-file-in-scala) – Pubudu Sitinamaluwa Nov 30 '18 at 14:37
  • I have a kind of a dumb suggestion. You can read it in to an Array[String] line by line. -> Create an RDD by paralleling it. -> split to fields with map function. -> Create a schema. -> Create a DataFrame. -> cast to required types with sql.functions :D – Pubudu Sitinamaluwa Nov 30 '18 at 14:56
  • oh okay, I see your point. You're the man! – DougKruger Nov 30 '18 at 15:06
  • spark.sparkContext.addFile("/home/vaquarkhan/test.csv") val df = spark.read .option("header", "true") .option("inferSchema","true") .csv("file:///home/vaquarkhan/test.csv") – vaquar khan Oct 22 '20 at 19:11