0

Wanted to save my pyspark output into .txt file for future reference. I wrote following code to save my file

fileName=names1[i]+".txt" # Generating file name as fieldname.txt
    #data1.groupby(names1[i]).agg(F.collect_set("Passenger_Id")).rdd.saveAsTextFile(names1[i]+'.txt') 
    data.groupby(names1[i]).agg(F.collect_set("Passenger_Id")).rdd.saveAsTextFile(fileName)

But after running the code I'm seeing folders with variable filename. Say if my filename is abc.txt then I'm seeing folder name as abc.txt and under that folder lots of part file without any extension. Here is the sample format of my part file

Row(Airpotr=u'ST', collect_set(Passenger_Id)=[u'30143072', u'36374515', u'45806865', u'37771107', u'18541154', u'91481534', u'30343069', u'41482082'])

How could I retrieve these part files together & create a spark data frame?

I also tried by following the steps mentioned here

import os
home=os.getcwd()
names1="Airpotr.txt"
dirPath = os.path.join(home, names1)
os.mkdir(dirPath)
textFiles = sc.wholeTextFiles(dirPath)
sorted(textFiles.collect())

but got error message as

SError: [Errno 17] File exists: '/user-home/.../Airpotr.txt'
Python Learner
  • 437
  • 2
  • 11
  • 28

1 Answers1

1

.txt file is not the right format to reload your data directly into a dataframe. You should use parquet or other columnar data storing format. To store and load your data:

data.write.parquet(fileName)
data = spark.read.parquet(fileName)

But since your data is already stored.

1. Loading all your data at once

You can load your files in one setting using a regular expression in your loading function:

rdd = sc.textFile(",".join([name + ".txt" for name in names1]))

This will load your flat files into an RDD where each row is of type string:

"Row(Airpotr=u'ST', collect_set(Passenger_Id)=[u'30143072', u'36374515', u'45806865', u'37771107', u'18541154', u'91481534', u'30343069', u'41482082'])"

2. Converting the RDD to a dataframe

Since the first variable name is different in all the files we'll use wholeTextFiles to load the data instead of textFile. This function adds the path of the file as the first element of each row. We'll then use a custom function to parse each row:

def parse_line(l):
    name = re.findall('.*/(.*?)\.txt/', l[0])[0]
    line = re.findall(name + "=u'(.*)', collect_set\(Passenger_Id\)=\[u'(.*)'\]", l[1])
    return Row(
        key = line[0][0],
        values = line[0][1].split("', u'"))

data = sc.wholeTextFiles(",".join([output_path + f + ".txt" for f in filenames]))\
    .filter(lambda l: l[1] != "")\
    .map(parse_line)\
    .toDF()
data.show()
    +---+--------------------------------------------------------------------------------+
    |key|values                                                                          |
    +---+--------------------------------------------------------------------------------+
    |ST |[30143072, 36374515, 45806865, 37771107, 18541154, 91481534, 30343069, 41482082]|
    +---+--------------------------------------------------------------------------------+
MaFF
  • 9,551
  • 2
  • 32
  • 41
  • Hi Marie, I'm trying using the way you mentioned for Converting the RDD to a dataframe.But I'm getting following error message after running the code `AttributeError: 'PipelinedRDD' object has no attribute 'toDF'` Kindly suggest – Python Learner Oct 27 '17 at 10:41
  • Any suggestion please? – Python Learner Oct 28 '17 at 17:36
  • You need an `SQLContext` to be able to use it (included in the `sparkSession` in Spark >2) . `sqlContext = SQLContext(sc)`. https://stackoverflow.com/questions/32788387/pipelinedrdd-object-has-no-attribute-todf-in-pyspark – MaFF Oct 29 '17 at 12:25
  • Thanks Marie. I have done it in other way. I used following code `sc.textFile(name).take(5)` – Python Learner Oct 30 '17 at 16:04
  • `.take()` outputs a list not a dataframe – MaFF Oct 30 '17 at 17:45