6

I have a folder which contains many small .gz files (compressed csv text files). I need to read them in my Spark job, but the thing is I need to do some processing based on info which is in the file name. Therefore, I did not use:

JavaRDD<<String>String> input = sc.textFile(...)

since to my understanding I do not have access to the file name this way. Instead, I used:

JavaPairRDD<<String>String,String> files_and_content = sc.wholeTextFiles(...);

because this way I get a pair of file name and the content. However, it seems that this way, the input reader fails to read the text from the gz file, but rather reads the binary Gibberish.

So, I would like to know if I can set it to somehow read the text, or alternatively access the file name using sc.textFile(...)

Belphegor
  • 4,456
  • 11
  • 34
  • 59
Yaniv Donenfeld
  • 565
  • 2
  • 8
  • 26
  • 2
    My solution isn't particularly good, I think you should try and make a feature request to spark, this is definitely a useful feature – aaronman Jun 25 '14 at 21:29

2 Answers2

2

You cannot read gzipped files with wholeTextFiles because it uses CombineFileInputFormat which cannot read gzipped files because they are not splittable (source proving it):

  override def createRecordReader(
      split: InputSplit,
      context: TaskAttemptContext): RecordReader[String, String] = {

    new CombineFileRecordReader[String, String](
      split.asInstanceOf[CombineFileSplit],
      context,
      classOf[WholeTextFileRecordReader])
  }

You may be able to use newAPIHadoopFile with wholefileinputformat (not built into hadoop but all over the internet) to get this to work correctly.

UPDATE 1: I don't think WholeFileInputFormat will work since it just gets the bytes of the file, meaning you may have to write your own class possibly extending WholeFileInputFormat to make sure it decompresses the bytes.

Another option would be to decompress the bytes yourself using GZipInputStream

UPDATE 2: If you have access to the directory name like in the OP's comment below you can get all the files like this.

Path path = new Path("");
FileSystem fileSystem = path.getFileSystem(new Configuration()); //just uses the default one
FileStatus []  fileStatuses = fileSystem.listStatus(path);
ArrayList<Path> paths = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) paths.add(fileStatus.getPath());
Community
  • 1
  • 1
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • thanks aaronman.. It seems that decompressing the data myself does not work. I think it's because representing the data as string may have tampered with the gzipped data representation, I am getting an IOException of "Not in GZIP format"... I saw somewhere I can access file properties by subclassing HadoppRDD, but I'm not really sure yet how.. – Yaniv Donenfeld Jun 29 '14 at 05:28
  • @YanivDonenfeld did you wrap it in a ByteArrayInputStream like the question I pointed you to says? – aaronman Jun 29 '14 at 16:17
  • Yes, I did.. try{ log.warn("Gzipped string is: " + gzipped); byte[] gzippedByteArray = gzipped.getBytes("ISO-8859-1"); byte[] buffer = new byte[1024]; java.io.ByteArrayInputStream bytein = new java.io.ByteArrayInputStream(gzippedByteArray); java.util.zip.GZIPInputStream gzin = new java.util.zip.GZIPInputStream(bytein); int length; while ((length = gzin.read(buffer)) > 0) { byteout.write(buffer, 0, length); } } – Yaniv Donenfeld Jun 29 '14 at 20:06
  • @YanivDonenfeld sorry can't think of a reason why there would be an erro – aaronman Jun 29 '14 at 20:18
  • By the way, another suggestion I got somewhere was to get the list of files under input folder, then use parallelize() on the list, and on worker side, read each file content through a Hadoop FileSystem object, reading explicitly as bytes and/or feeding directly into the GZIPInputStream. My question is: how can I read the list of filenames under the input folder? – Yaniv Donenfeld Jul 01 '14 at 21:30
  • @YanivDonenfeld also if I actually helped u consider upvoting/accepting my answer – aaronman Jul 02 '14 at 03:01
  • aaronman update 2 was definitely a good answer, and I was able to retrieve the list of files. Then, parallelize on the list, and started doing the transformations. However, eventually I had to take a different path. The reason is I used GZipInputFormat to read the data (the files are gzipped), and for some reason, it tampered with the UTF-8 encoding for some special characters (replaced it with \u0019). Eventually I found a way to avoid getting the info from file name by forcing more assumptions on my job's input data, and then doing a regular sc.textFile(), which read the gz smoothly. – Yaniv Donenfeld Jul 03 '14 at 06:40
  • sorry, I can't seem to tag you with @ – Yaniv Donenfeld Jul 03 '14 at 06:43
  • @YanivDonenfeld glad to hear you solved your problem – aaronman Jul 03 '14 at 13:30
0

I faced the same issue while using spark to connect to S3.

My File was a gzip csv with no extension .

JavaPairRDD<String, String> fileNameContentsRDD = javaSparkContext.wholeTextFiles(logFile);

This approach returned currupted values

I solved it by using the the below code :

JavaPairRDD<String, String> fileNameContentsRDD = javaSparkContext.wholeTextFiles(logFile+".gz");

By adding .gz to the S3 URL , spark automatically picked the file and read it like gz file .(Seems a wrong approach but solved my problem .

Soner Gönül
  • 97,193
  • 102
  • 206
  • 364
Anshuman Ranjan
  • 180
  • 1
  • 1
  • 9