4

I have a S3 bucket that is filled with Gz files that have no file extension. For example s3://mybucket/1234502827-34231

sc.textFile uses that file extension to select the decoder. I have found many blog post on handling custom file extensions but nothing about missing file extensions.

I think the solution may be sc.binaryFiles and unzipping the file manually.

Another possibility is to figure out how sc.textFile finds the file format. I'm not clear what these classOf[] calls work.

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }
Cœur
  • 37,241
  • 25
  • 195
  • 267
jspooner
  • 10,975
  • 11
  • 58
  • 81
  • `sc.textFile` doesn't determine the format. It is done by `TextInputFormat` and only the extension is used. –  Dec 10 '16 at 13:16
  • Or rename all the files in s3, adding the `.gz`. I had a look at the source and it's implemented here: https://hadoop.apache.org/docs/stable/api/src-html/org/apache/hadoop/io/compress/CompressionCodecFactory#line.191 It really does use the file extension. The spec suggests you could just look at the first couple of bytes http://www.zlib.org/rfc-gzip.html#file-format , but this suggests you can get false positives and have to consider endian https://stackoverflow.com/questions/6059302/how-to-check-if-a-file-is-gzip-compressed so no doubt just using the `.gz` is a faster, reliable convention – Davos Jul 12 '17 at 15:01
  • @user6022341 `TextInputFormat` is not doing it, it's the `getCodec(Path file)` method in this Class `org.apache.hadoop.io.compress.CompressionCodecFactory` – Davos Jul 12 '17 at 15:04

4 Answers4

2

Can you try to combine the below solution for ZIP files, with gzipFileInputFormat library?

here - How to open/stream .zip files through Spark? You can see how to do it using ZIP:

rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

gzipFileInputFormat:

https://github.com/bsankaran/internet_routing/blob/master/hadoop-tr/src/main/java/edu/usc/csci551/tools/GZipFileInputFormat.java

Some details about newAPIHadoopFile() can be found here: http://spark.apache.org/docs/latest/api/python/pyspark.html

Community
  • 1
  • 1
Yaron
  • 10,166
  • 9
  • 45
  • 65
1

I found several examples out there that almost fit my needs. Here is the final code I used to parse a file compressed with GZ.

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractBSM(ps: PortableDataStream, n: Int = 1024) = Try {
  val gz = new GzipCompressorInputStream(ps.open)
  Stream.continually {
    // Read n bytes
    val buffer = Array.fill[Byte](n)(-1)
    val i = gz.read(buffer, 0, n)
    (i, buffer.take(i))
  }
  // Take as long as we've read something
  .takeWhile(_._1 > 0)
  .map(_._2)
  .flatten
  .toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
val inputFile = "s3://my-bucket/157c96bd-fb21-4cc7-b340-0bd4b8e2b614"
val rdd = sc.binaryFiles(inputFile).flatMapValues(x => extractBSM(x).toOption).map( x => decode()(x._2) )
val rdd2 = rdd.flatMap { x => x.split("\n") }
rdd2.take(10).foreach(println)
jspooner
  • 10,975
  • 11
  • 58
  • 81
  • This works for GZ but we really need to check the magic bytes and apply the correct compression algo – jspooner Dec 10 '16 at 00:48
  • You should attribute the source. Also this is justified only for non-splittable formats (like gz) so generic solution with recognition doesn't make much sense. –  Dec 10 '16 at 13:20
  • This answer has a correctness bug! It's technically valid for a `read()` call to return `0` bytes even though we haven't hit the end of stream. I think this should read until you hit end-of-stream (when `-1` is returned from `read()`). Better yet, a helper function like Guava's `ByteStreams.toByteArray()`. – Josh Rosen Jun 07 '19 at 00:00
0

You can create your own custom codec for decoding your file. You can start by extending GzipCodec and override getDefaultExtension method where you return empty string as an extension.

EDIT: That soultion will not work in all cases due to how CompressionCodecFactory is implemented. For example: By default codec for .lz4 is loaded. This means if name of a file that you want to load ends with 4, that codec will get picked instead of custom (w/o extension). As that codec does not match extension it will get later ditched and no codec will be used.

Java:

package com.customcodec;

import org.apache.hadoop.io.compress.GzipCodec;

public class GzipCodecNoExtension extends GzipCodec {

    @Override
    public String getDefaultExtension() {
        return "";
    }
}

In spark app you just register your codec:

    SparkConf conf = new SparkConf()
            .set("spark.hadoop.io.compression.codecs", "com.customcodec.GzipCodecNoExtension");
Robert
  • 55
  • 1
  • 6
0

You can read binary file and do decompression using map function.

JavaRDD<Tuple2<String, PortableDataStream>> rawData = spark.sparkContext().binaryFiles(readLocation, 1).toJavaRDD();

JavaRDD<String> decompressedData = rawData.map((Function<Tuple2<String, PortableDataStream>, String>) stringPortableDataStreamTuple2 -> {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    GZIPInputStream s = new GZIPInputStream(new ByteArrayInputStream(stringPortableDataStreamTuple2._2.toArray()));
    IOUtils.copy(s, out);

    return new String(out.toByteArray());
});

In case of JSON content you can read that into Dataset using

Dataset co = spark.read().json(decompressedData);
Robert
  • 55
  • 1
  • 6