9

I'm trying to create a Spark RDD from several json files compressed into a tar. For example, I have 3 files

file1.json
file2.json
file3.json

And these are contained in archive.tar.gz.

I want to create a dataframe from the json files. The problem is that Spark is not reading in the json files correctly. Creating an RDD using sqlContext.read.json("archive.tar.gz") or sc.textFile("archive.tar.gz") results in garbled/extra output.

Is there some way to handle gzipped archives containing multiple files in Spark?

UPDATE

Using the method given in the answer to Read whole text files from a compression in Spark I was able to get things running, but this method does not seem to be suitable for large tar.gz archives (>200 mb compressed) as the application chokes up on large archive sizes. As some of the archives I'm dealing with reach sizes upto 2 GB after compression I'm wondering if there is some efficient way to deal with the problem.

I'm trying to avoid extracting the archives and then merging the files together as this would be time consuming.

Community
  • 1
  • 1
zenofsahil
  • 1,713
  • 2
  • 16
  • 18
  • I'm facing the same problem with large files, did you find a solution (other than unzipping manually and then loading to spark)? :) – ixaxaar Jan 16 '17 at 12:29
  • @ixaxaar, I ended up converting all the tar archives to hadoop sequence files which Spark works with nicely. https://stuartsierra.com/2008/04/24/a-million-little-files – zenofsahil Jan 16 '17 at 21:26
  • thanks a lot for the jar! I actually have 50 tar files, each with a million small (json) files :D – ixaxaar Jan 16 '17 at 21:37
  • There you go then. :) – zenofsahil Jan 16 '17 at 21:47

2 Answers2

12

A solution is given in Read whole text files from a compression in Spark . Using the code sample provided, I was able to create a DataFrame from the compressed archive like so:

val jsonRDD = sc.binaryFiles("gzarchive/*").
               flatMapValues(x => extractFiles(x).toOption).
               mapValues(_.map(decode())

val df = sqlContext.read.json(jsonRDD.map(_._2).flatMap(x => x))

This method works fine for tar archives of a relatively small size, but is not suitable for large archive sizes.

A better solution to the problem seems to be to convert the tar archives to Hadoop SequenceFiles, which are splittable and hence can be read and processed in parallel in Spark (as opposed to tar archives.)

See: A Million Little Files – Digital Digressions by Stuart Sierra.

zenofsahil
  • 1,713
  • 2
  • 16
  • 18
-3

Files inside of a *.tar.gz file, as you already have mentioned are compressed. You cannot put the 3 files into a single compressed tar file and expect the import function (which is looking for only text) to know how to handle decompressing the files, unpacking them from the tar archive, and then importing each file individually.

I would recommend you take the time to upload each individual json file manually since both sc.textfile and sqlcontext.read.json functions cannot handle compressed data.

DJHenjin
  • 87
  • 4