2

I would like to do streaming from tar-gzip files (tgz) which include my actual CSV stored data.

I already managed to do structured streaming with spark 2.2 when my data comes in as CSV files, but actually, the data comes in as gzipped csv files.

Is there a way that the trigger done by structured streaming does an decompress before handling the CSV stream?

The code I use to process the files is this:

val schema = Encoders.product[RawData].schema
val trackerData = spark
  .readStream
  .option("delimiter", "\t")
  .schema(schema)
  .csv(path)
val exceptions = rawCientData
  .as[String]
  .flatMap(extractExceptions)
  .as[ExceptionData]

produced output as expected when path points to csv files. But I would like to use tar gzip files. When I try to place those files at the given path, I do not get any exceptions and batch output tells me

  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 1095,
    "processedRowsPerSecond" : 211.0233185584891
  } ],

But I do not get any actual data processed. Console sink looks like this:

+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+
zero323
  • 322,348
  • 103
  • 959
  • 935
Matthias Mueller
  • 102
  • 2
  • 12

2 Answers2

2

I solved the part of reading .tar.gz (.tgz) files this way: Inspired by this site I created my own TGZ codec

final class DecompressTgzCodec extends CompressionCodec {
  override def getDefaultExtension: String = ".tgz"

  override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
  override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
  override def createCompressor(): Compressor = ???
  override def getCompressorType: Class[_ <: Compressor] = ???

  override def createInputStream(in: InputStream): CompressionInputStream = {
    new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
  }
  override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)

  override def createDecompressor(): Decompressor = null
  override def getDecompressorType: Class[_ <: Decompressor] = null

  final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
    def updateStream(): Unit = {
      // still have data in stream -> done
      if (in.available() <= 0) {
        // create stream content from following tar elements one by one
        in.getNextTarEntry()
      }
    }

    override def read: Int = {
      checkStream()
      updateStream()
      in.read()
    }

    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      checkStream()
      updateStream()
      in.read(b, off, len)
    }

    override def resetState(): Unit = {}
  }
}

And registered it for use by spark.

val conf = new SparkConf()
conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)

val spark = SparkSession
  .builder()
  .master("local[*]")
  .config(conf)
  .appName("Streaming Example")
  .getOrCreate()

Works exactly like I wanted it to do.

Matthias Mueller
  • 102
  • 2
  • 12
  • can you share the github location, if it exists. I'd like to check out all of the imports and dependencies? – Douglas M Dec 31 '20 at 05:17
1

I do not think reading tar.gz'ed files is possible in Spark (see Read whole text files from a compression in Spark or gzip support in Spark for some ideas).

Spark does support gzip files, but they are not recommended as not splittable and result in a single partition (that in turn makes Spark of little to no help).

In order to have gzipped files loaded in Spark Structured Streaming you have to specify the path pattern so the files are included in loading, say zsessionlog*.csv.gz or alike. Else, csv alone loads CSV files only.

If you insist on Spark Structured Streaming to handle tar.gz'ed files, you could write a custom streaming data Source to do the un-tar.gz.

Given gzip files are not recommended as data format in Spark, the whole idea of using Spark Structured Streaming does not make much sense.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • 2
    Thanks Jacek, I voted for the answer, but due to my low reputation level it currently is not counted. BTW: Regarding Spark and using gzip files is not a matter regarding petitioning when you have > 200 gzip files. There is enough option to do partitioning :-) – Matthias Mueller Dec 30 '17 at 22:51
  • @MatthiasMueller Let's talk about it under the other question (and let's make it more complete then) --> https://stackoverflow.com/q/47604184/1305344. Deal? – Jacek Laskowski Jan 03 '18 at 09:43
  • 1
    Yes, would be great, but I can not add any comment on the other question or answer because I only have less than 50 reputations :-( – Matthias Mueller Jan 03 '18 at 10:10
  • What are the questions then? Let's work them out together here and I'll be updating the right question @ https://stackoverflow.com/q/47604184/1305344. – Jacek Laskowski Jan 03 '18 at 10:45
  • ok, let's start: When I implement `org.apache.spark.sql.execution.streaming.Source`what wouild be the minimalistic schema? `StructType(StructField("value", StringType) :: Nil)` And what are the semantics of `getOffset` and `getBatch`? – Matthias Mueller Jan 04 '18 at 07:02
  • @JacekLaskowski: Just want to echo the answer from Matthias Mueller. Though a single gzip file result in a single partition when reading as a regular DF but this is read stream and there're multiple files coming in and normally each file is small in Streaming so this does not matter – James Nguyen Aug 23 '19 at 17:41