6

I am new to Spark and Scala. We have ad event log files formatted as CSV's and then compressed using pkzip. I have seen many examples on how to decompress zipped files using Java, but how would I do this using Scala for Spark? We, ultimately, want to get, extract, and load the data from each incoming file into an Hbase destination table. Maybe this can this be done with the HadoopRDD? After this, we are going to introduce Spark streaming to watch for these files.

Thanks, Ben

Ben
  • 81
  • 1
  • 3
  • 6

2 Answers2

5

Default compression support

@samthebest answer is correct, if you are using compression format that is by default available in Spark (Hadoop). Which are:

  • bzip2
  • gzip
  • lz4
  • snappy

I have explained this topic deeper in my other answer: https://stackoverflow.com/a/45958182/1549135

Reading zip

However, if you are trying to read a zip file you need to create a custom solution. One is mentioned in the answer I have already provided.

If you need to read multiple files from your archive, you might be interested in the answer I have provided: https://stackoverflow.com/a/45958458/1549135

Basically, all the time, using sc.binaryFiles and later on decompressing the PortableDataStream, like in the sample:

sc.binaryFiles(path, minPartitions)
  .flatMap { case (name: String, content: PortableDataStream) =>
    val zis = new ZipInputStream(content.open)
    Stream.continually(zis.getNextEntry)
          .takeWhile(_ != null)
          .flatMap { _ =>
              val br = new BufferedReader(new InputStreamReader(zis))
              Stream.continually(br.readLine()).takeWhile(_ != null)
          }
Community
  • 1
  • 1
Atais
  • 10,857
  • 6
  • 71
  • 111
4

In Spark, provided your files have the correct filename suffix (e.g. .gz for gzipped), and it's supported by org.apache.hadoop.io.compress.CompressionCodecFactory, then you can just use

sc.textFile(path)

UPDATE: At time of writing their is a bug in Hadoop bzip2 library which means trying to read bzip2 files using spark results in weird exceptions - usually ArrayIndexOutOfBounds.

samthebest
  • 30,803
  • 25
  • 102
  • 142
  • 1
    It doesn't work to me. I have a zipped file (with the .zip extension) and doing `sc.textFile(path)` throws an exception... – mgaido Mar 12 '15 at 09:40