15

I have the following problem: suppose that I have a directory containing compressed directories which contain multiple files, stored on HDFS. I want to create an RDD consisting some objects of type T, i.e.:

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

Now when inputDataPath is a directory containing files this works perfectly fine, i.e. when it's something like:

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

But, when there's a tgz containing multiple files, the file content (fileNameContent._2()) gets me some useless binary string (quite expected). I found a similar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a question about wholeTextFiles, but this doesn't work in my case.

Any ideas how to do this?

EDIT:

I tried with the reader from here (trying to test the reader from here, like in the function testTarballWithFolders()), but whenever I call

TarballReader tarballReader = new TarballReader(fileName);

and I get NullPointerException:

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda$0(SparkMain.java:105)
    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The line 105 in MainSpark is the one I showed upper in my edit of the post, and line 61 from TarballReader is

GZIPInputStream gzip = new GZIPInputStream(in);

which gives a null value for the input stream in in the upper line:

InputStream in = this.getClass().getResourceAsStream(tarball);

Am I on the right path here? If so, how do I continue? Why do I get this null value and how can I fix it?

Community
  • 1
  • 1
Belphegor
  • 4,456
  • 11
  • 34
  • 59
  • Did u see: http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles – Eray Balkanli Apr 15 '16 at 17:02
  • Yes, I wrote in my question - it doesn't work in my case. In `paths` I get the path of the compression again, which is useless. – Belphegor Apr 15 '16 at 18:42
  • This is a duplicate of "What is a Null Pointer Exception and how do I avoid it" –  Apr 16 '16 at 16:33
  • @JarrodRoberson How so? It's about reading whole text files in Spark from a compression, the edit is just one way I tried to solve it, and just updated my question. It's not a duplicate. – Belphegor Apr 16 '16 at 16:39
  • you are passing a `null` reference as an inputstream, says so right here `java.lang.NullPointerException at java.util.zip.InflaterInputStream.(InflaterInputStream.java:83)` so it is a duplicate of every other question about `NPE`. Make that reference not null and you have your solution, you have wasted `100` rep on this question. –  Apr 16 '16 at 17:12

2 Answers2

35

One possible solution is to read data with binaryFiles and extract content manually.

Scala:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
  val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
  Stream.continually(Option(tar.getNextTarEntry))
    // Read until next exntry is null
    .takeWhile(_.isDefined)
    // flatten
    .flatMap(x => x)
    // Drop directories
    .filter(!_.isDirectory)
    .map(e => {
      Stream.continually {
        // Read n bytes
        val buffer = Array.fill[Byte](n)(-1)
        val i = tar.read(buffer, 0, n)
        (i, buffer.take(i))}
      // Take as long as we've read something
      .takeWhile(_._1 > 0)
      .map(_._2)
      .flatten
      .toArray})
    .toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
  new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x => 
  extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

import tarfile
from io import BytesIO

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
    .mapValues(extractFiles)
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))
zero323
  • 322,348
  • 103
  • 959
  • 935
  • It can be useful once in a while. At the end of the day the only problem here is Java / Scala IO API which can be little bit heavy given the job. – zero323 Apr 19 '16 at 20:53
  • The scala code seems to be extremely slow on many files (~6GB total). Is there any way to improve this? In python I get better performance but then I need to use many int casting which are terribly slow in python. – Niros Aug 23 '16 at 15:11
  • @Niros You can probably drop to plain loops but I am not aware of any method that can make it significantly faster. I never looked though so there can be something out there. – zero323 Aug 25 '16 at 17:59
  • I have a similar scenario to solve. Just that I do not have to read all the files from .tar.gz files. I just have to read those files which has the extension .nxml. How can do it? – Ravi Ranjan Mar 09 '17 at 17:32
  • @RaviRanjan https://commons.apache.org/proper/commons-compress/apidocs/org/apache/commons/compress/archivers/tar/TarArchiveEntry.html#getName-- – zero323 Mar 09 '17 at 18:00
  • I guess this method will only take advantage of single machine, right? SO this may not be a nice try if file is as large as 10G when I get a week single machine? – G_cy Jun 01 '17 at 09:59
  • @G_cy gzip compression is unsplittable, so effectively it is not useful for storing large files for processing with Spark / Hadoop. Solution you use doesn't effectively matter I am afraid. Single gzip will be alway processed by a single machine. – zero323 Jun 01 '17 at 10:53
  • @zero323 I thought gzip would be splittable cuz former parquet codec is gzip, that wired. Actually, I tried your code with bz codec, still get only one task running. – G_cy Jun 02 '17 at 01:37
  • It is not. Take a look at https://issues.apache.org/jira/browse/HADOOP-7076 for example. And you have one input file then one task is expected. – zero323 Jun 02 '17 at 08:39
  • @zero323 I checked this jira too. it marked as resolved. could I regard gzip as splittable? And I think bzip codec is splittable, but in practice with code above, I found it still only get one task. Is it expected? – G_cy Jun 06 '17 at 02:33
  • @G_cy JIRA is resolved as _later_ and its goal was not to make files splittable, but to unpack files before passing to Hadoop. Splittability is a property of the compression format, not input format implementation. – zero323 Jun 06 '17 at 17:53
  • Has anyone try the Python version? When try it on a tar.gz file with one simple csv file, I got error "ValueError: embedded null byte" if try to foreach(print). Any idea? – Luke Apr 19 '19 at 15:25
  • Spark supports reading gzip compressed files directly. I tried with JSON and it worked: https://stackoverflow.com/a/49502965/5986661 – Omkar Neogi Jan 23 '20 at 15:55
0

A slight improvement on the accepted answer is to change

Option(tar.getNextTarEntry)

to

Try(tar.getNextTarEntry).toOption.filter( _ != null)

to contend with malformed / truncated .tar.gzs in a robust way.

BTW, is there anything special about the size of the buffer array? Would it faster on average if it were closer to the average file size, maybe 500k in my case? Or is the slowdown I am seeing more likely the overhead of Stream relative to a while loop that was more Java-ish, I guess.

Neil Best
  • 817
  • 9
  • 17