26

I am a newbie to Spark. I'm trying to read a local csv file within an EMR cluster. The file is located in: /home/hadoop/. The script that I'm using is this one:

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()\

df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)

When I run the script raises the following error message:

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv

Then, I found out that I have to add file:// in the file path so it can read the file locally:

df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)

But this time, the above approach raised a different error:

Lost task 0.3 in stage 0.0 (TID 3,
ip-172-31-41-81.eu-west-1.compute.internal, executor 1): java.io.FileNotFoundException: File file:/home/hadoop/observations_temp.csv does not exist

I think is because the file// extension just read the file locally and it does not distribute the file across the other nodes.

Do you know how can I read the csv file and make it available to all the other nodes?

ultraInstinct
  • 4,063
  • 10
  • 36
  • 53
  • And, as an aside, I found storing data files in S3 made life a bit simpler, once you have granted your cluster access to your bucket/s. I know this doesn't solve your problem directly, but thought I'd mention anyway. – ImDarrenG Feb 07 '17 at 14:16
  • Yes, it is located in the instance that runs the driver(the master node if that's what you means) – ultraInstinct Feb 07 '17 at 14:19
  • The original file is in S3 but I have to download it first to be able to process it and convert it to another format. The output is the one that I'm trying to read. – ultraInstinct Feb 07 '17 at 14:20
  • How are you downloading the file? – ImDarrenG Feb 07 '17 at 14:27
  • I download the file using cli os.system("aws s3 cp "s3://raw_data/files/observation.protob /home/hadoop/mount_point/s3)). I download it to a different volume(because the file size). From there I can read it and generate the output file in /home/hadoop/ – ultraInstinct Feb 07 '17 at 14:35
  • What is your cluster manager ? Spark standalone or YARN ? – mrsrinivas Feb 07 '17 at 16:19
  • Related : https://stackoverflow.com/questions/38879478/sparkcontext-addfile-vs-spark-submit-files – duplex143 Jul 22 '22 at 08:44

3 Answers3

28

You are right about the fact that your file is missing from your worker nodes thus that raises the error you got.

Here is the official documentation Ref. External Datasets.

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

So basically you have two solutions :

You copy your file into each worker before starting the job;

Or you'll upload in HDFS with something like : (recommended solution)

hadoop fs -put localfile /user/hadoop/hadoopfile.csv

Now you can read it with :

df = spark.read.csv('/user/hadoop/hadoopfile.csv', header=True)

It seems that you are also using AWS S3. You can always try to read it directly from S3 without downloading it. (with the proper credentials of course)

Some suggest that the --files tag provided with spark-submit uploads the files to the execution directories. I don't recommend this approach unless your csv file is very small but then you won't need Spark.

Alternatively, I would stick with HDFS (or any distributed file system).

eliasah
  • 39,588
  • 11
  • 124
  • 154
  • Since the answer is now 2 years old i would like to know if there is any update regarding to this problem? That is quite weird and cheap malfunction from apache that spark can't access files in random places. I mean what kind of analytical engine is that, that cant even gain acess to files properly? – Amir Aug 22 '19 at 12:07
  • Thanks! Instead of using cluster, I ran it with master=local[4], so I need not to spread the file to machines or put it to hadoop. By the way, if you need a cluster to process your file, it indicates that you need a distributed file system and you should put your file into it. It is not feasible to distribute the files to the worker nodes mostly. – Onur Demir Nov 08 '21 at 08:31
5

I think what you are missing is explicitly setting the master node while initializing the SparkSession, try something like this

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

and then read the file in the same way you have been doing

df = spark.read.csv('file:///home/hadoop/observations_temp.csv')

this should solve the problem...

Nishant Sethi
  • 51
  • 1
  • 2
0

Might be useful for someone running zeppelin on mac using Docker.

  1. Copy files to custom folder : /Users/my_user/zeppspark/myjson.txt

  2. docker run -p 8080:8080 -v /Users/my_user/zeppspark:/zeppelin/notebook --rm --name zeppelin apache/zeppelin:0.9.0

  3. On Zeppelin you can run this to get your file:

%pyspark

json_data = sc.textFile('/zeppelin/notebook/myjson.txt')

Impermanence
  • 146
  • 1
  • 4