1

I´m used to program in Python. My company now got a Hadoop Cluster with Jupyter installed. Until now I never used Spark / Pyspark for anything.

I am able to load files from HDFS as easy as this:

text_file = sc.textFile("/user/myname/student_grades.txt")

And I´m able to write output like this:

text_file.saveAsTextFile("/user/myname/student_grades2.txt")

The thing I´m trying to achieve is to use a simple "for loop" to read text files one-by-one and write it's content into one HDFS file. So I tried this:

list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt']

for i in list:
    text_file = sc.textFile("/user/myname/" + i)
    text_file.saveAsTextFile("/user/myname/all.txt")

So this works for the first element of the list, but then gives me this error message:

Py4JJavaError: An error occurred while calling o714.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
XXXXXXXX/user/myname/all.txt already exists

To avoid confusion I "blured"-out the IP address with XXXXXXXX.


What is the right way to do this? I will have tons of datasets (like 'text1', 'text2' ...) and want to perform a python function with each of them before saving them into HDFS. But I would like to have the results all together in "one" output file.

Thanks a lot!
MG

EDIT: It seems like that my final goal was not really clear. I need to apply a function to each text file seperately and then I want to append the output to the existing output directory. Something like this:

for i in list:
    text_file = sc.textFile("/user/myname/" + i)
    text_file = really_cool_python_function(text_file)
    text_file.saveAsTextFile("/user/myname/all.txt")
mgruber
  • 751
  • 1
  • 9
  • 26

4 Answers4

1

I wanted to post this as comment but could not do so as I do not have enough reputation.

You have to convert your RDD to dataframe and then write it in append mode. To convert RDD to dataframe please look into this answer:
https://stackoverflow.com/a/39705464/3287419
or this link http://spark.apache.org/docs/latest/sql-programming-guide.html
To save dataframe in append mode below link may be useful:
http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes

Almost same question is here also Spark: Saving RDD in an already existing path in HDFS . But the answer provided is for scala. I hope something similar can be done in python also.

There is yet another (but ugly) approach. Convert your RDD to string. Let the resulting string be resultString . Use subprocess to append that string to destination file i.e.

subprocess.call("echo "+resultString+" | hdfs dfs -appendToFile - <destination>", shell=True)
Community
  • 1
  • 1
asanand
  • 404
  • 3
  • 8
0

you can read multiple files and save them by

textfile = sc.textFile(','.join(['/user/myname/'+f for f in list]))
textfile.saveAsTextFile('/user/myname/all')

you will get all part files within output directory.

Suresh
  • 5,678
  • 2
  • 24
  • 40
  • It seems like that my final goal was not really clear. I need to apply a function to each text file seperately and then I want to append the output to the existing output directory. See the EDIT – mgruber May 12 '17 at 14:28
  • same function for all text files ? – Suresh May 13 '17 at 15:22
  • Yes same function for all files, but I cant join the text files before because each files needs to be treated seperately – mgruber May 15 '17 at 07:12
  • columns in all the files will be similar or different?? – Suresh May 16 '17 at 15:02
0

If the text files all have the same schema, you could use Hive to read the whole folder as a single table, and directly write that output.

Henry
  • 1,646
  • 12
  • 28
0

I would try this, it should be fine:

   list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt']
    
    for i in list:
        text_file = sc.textFile("/user/myname/" + i)
    text_file.saveAsTextFile(f"/user/myname/{i}")
Eimis Pacheco
  • 87
  • 1
  • 11