20

I have one question - how to load local file (not on HDFS, not on S3) with sc.textFile at PySpark. I read this article, then copied sales.csv to master node's local (not HDFS), finally executed following

sc.textFile("file:///sales.csv").count()

but it returns following error, saying file:/click_data_sample.csv does not exist

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 10, ip-17x-xx-xx-xxx.ap-northeast-1.compute.internal): java.io.FileNotFoundException: File file:/sales.csv does not exist

I tryed file://sales.csv and file:/sales.csv but both also failed.

It is very helpful you give me kind advice how to load local file.


Noted1:

  • My envrionment is Amazon emr-4.2.0 + Spark 1.5.2.
  • All ports are opened

Noted2:

I confirmed load file from HDFS or S3 works.

Here is the code of loading from HDFS - download csv, copy to hdfs in advance then load with sc.textFile("/path/at/hdfs")

commands.getoutput('wget -q https://raw.githubusercontent.com/phatak-dev/blog/master/code/DataSourceExamples/src/main/resources/sales.csv')
commands.getoutput('hadoop fs -copyFromLocal -f ./sales.csv /user/hadoop/')
sc.textFile("/user/hadoop/sales.csv").count()  # returns "15" which is number of the line of csv file

Here is the code of loading from S3 - put csv file at S3 in advance then load with sc.textFile("s3n://path/at/hdfs") with "s3n://" flag.

sc.textFile("s3n://my-test-bucket/sales.csv").count() # also returns "15" 
Community
  • 1
  • 1
Taka4Sato
  • 201
  • 1
  • 2
  • 3

3 Answers3

12

The file read occurs on the executor node. In order for your code to work, you should distribute your file over all nodes.

In case the Spark driver program is run on the same machine where the file is located, what you could try is read the file (e.g. with f=open("file").read() for python), and then call sc.parallelize to convert the file content to an RDD.

Havnar
  • 2,558
  • 7
  • 33
  • 62
facha
  • 11,862
  • 14
  • 59
  • 82
  • facha, thank you for the comment. I see the point why my code failed - the file have to be all slave nodes (not cluster's master node)! – Taka4Sato Feb 01 '16 at 17:17
4

If your running in a clustered mode you need to copy the file across all the nodes of same shared file system. Then spark reads that file otherwise you should use HDFS

I copied txt file into HDFS and spark takes file from HDFS.

I copied txt file on the shared filesystem of all nodes then spark read that file.

Both worked for me

Raghav
  • 96
  • 1
  • 5
  • 1
    Can you post what exactly did you do for reading the file from hdfs? I am trying this rdd = sc.textFile("hdfs://master:54310/cc-news-warc-paths1") which is of no help. – Ravi Ranjan Jul 31 '17 at 13:35
3

I had a similar problem to this, facha is correct that the data you are trying to load must be accessible across your cluster (for both the master and executors).

I believe in your case the file:/ command is still trying to load from your hadoop HDFS which doesnt exist, you can test this by using the following the command

hadoop fs -cat yourfile.csv

I solved this problem by loading the file from hdfs, and reading from hdfs, here is the code:

var conf = new org.apache.hadoop.conf.Configuration();     
var fs = org.apache.hadoop.fs.FileSystem.get(conf); 
var filenamePath = new org.apache.hadoop.fs.Path("myfile.json");  

   if (fs.exists(filenamePath))
   {
       fs.delete(filenamePath, true);
   }

   var fin = fs.create(filenamePath);
   fin.writeBytes(html);
   fin.close();

val metOffice = sql.read.json("myfile.json")    
andrew.butkus
  • 777
  • 6
  • 19