I have some files (part-00000.gz, part-00001.gz, part-00002.gz, ...) and each part is rather large. I need to use the filename of each part because it contains time stamp information. As I know, it seems that in pyspark only wholeTextFiles can read input as (filename, content). However, i get the error of out of memory when using wholeTextFiles. So, my guess is that wholeTextFiles reads a whole part as content in mapper without partition operation. I also find this answer (How does the number of partitions affect `wholeTextFiles` and `textFiles`?). If so, how can i get the filename of a rather large part file. Thanks
Asked
Active
Viewed 1,742 times
1 Answers
2
You get the error because wholeTextFiles
tries to read the entire file into a single RDD. You're better off reading the file line-by-line, which you can do simply by writing your own generator and using the flatMap
function. Here's an example of doing that to read a gzip file:
import gzip
def read_fun_generator(filename):
with gzip.open(filename, 'rb') as f:
for line in f:
yield line.strip()
gz_filelist = glob.glob("/path/to/files/*.gz")
rdd_from_bz2 = sc.parallelize(gz_filelist).flatMap(read_fun_generator)
-
I am working in amazon s3. Does glob.glob work? Your answer also seems to read all lines of a file given the filename(part-00000). Should I replace bz2.open with sc.textfile – ScutterKey Mar 30 '16 at 07:44
-
I not only want to use filename but also the content. It seems that you answer still read a whole part file in a RDD line-by-line. – ScutterKey Mar 30 '16 at 10:50
-
You'll have to tweak this code to do exactly what you want. The glob command is really just to get a list of file names, which is saved to `bz2_filelist`. The idea is to create an RDD of filenames (which is what `parallelize` does) then for each filename read every line in that file. Note that you have access to the filename in this generator. For example, you can do `yield filename + "|" + line.strip()` if you want to prepend the filename to every line. – santon Mar 30 '16 at 22:59