0

I need to read from a lot of gzips from hdfs, like this: sc.textFile('*.gz') while some of these gzips are corrupted, raises

java.io.IOException: gzip stream CRC failure

stops the whole processing running.

I read the debate here, where someone has the same need, but get no clear solution. Since it's not appropriate to achieve this function within spark (according to the link), is there any way just brutally skip corrupted files? There seem to have hints for scala user, no idea how to deal with it in python.

Or I can only detect corrupted files first, and delete them?

What if I have large amount of gzips, and after a day of running, find out the last one of them are corrupted. The whole day wasted. And having corrupted gzips are quite common.

zhangcx93
  • 41
  • 1
  • 5

1 Answers1

0

You could manually list all of the files and then read over the files in a map UDF. The UDF could then have try/except blocks to handles corrupted files.

The code would look something like

import gzip
from pyspark.sql import Row

def readGzips(fileLoc):
    try:
        ...
        code to read file
        ...
        return record
    except:
        return Row(failed=fileLoc)

from os import listdir
from os.path import isfile, join
fileList = [f for f in listdir(mypath) if isfile(join(mypath, f))]

pFileList = sc.parallelize(fileList)
dataRdd = pFileList.map(readGzips).filter((lambda x: 'failed' not in x.asDict()))
David
  • 11,245
  • 3
  • 41
  • 46
  • So the 'code to read file' part is to use python's gzip module to decompress the file? and the input is a list of string of paths, my files are on hdfs, how can I use raw python to read them? – zhangcx93 Apr 26 '16 at 02:11
  • Are you asking how to read a file off of hdfs? Or how to read a gzip? http://stackoverflow.com/questions/10566558/python-read-lines-from-compressed-text-files describes how to use gzip. And hdfs should be accessible with "hdfs://path/to/file.gz" – David Apr 26 '16 at 13:45
  • I mean in the readGzips(), you can only use python to access file from hdfs, with external package, because open('hdfs://file') will just raise IOError. Also, in such way, the mesos and spark can't assign job properly to make the machine with the file to deal with the data, thus way slower than normal file reading with sc.textFile(). – zhangcx93 Apr 27 '16 at 08:17
  • Fair points. But, I don't see another way to filter out corrupted files while reading in Spark. Preprocessing & removing corrupted files would be your other option. Before doing so, it might be worth doing a timing test and seeing how much slower this methodology is vs sc.textFile() on a batch of known, non-corrupted gzips. That could help determine whether preprocessing is worthwhile. – David Apr 27 '16 at 13:42