I would like to do streaming from tar-gzip files (tgz) which include my actual CSV stored data.
I already managed to do structured streaming with spark 2.2 when my data comes in as CSV files, but actually, the data comes in as gzipped csv files.
Is there a way that the trigger done by structured streaming does an decompress before handling the CSV stream?
The code I use to process the files is this:
val schema = Encoders.product[RawData].schema
val trackerData = spark
.readStream
.option("delimiter", "\t")
.schema(schema)
.csv(path)
val exceptions = rawCientData
.as[String]
.flatMap(extractExceptions)
.as[ExceptionData]
produced output as expected when path points to csv files. But I would like to use tar gzip files. When I try to place those files at the given path, I do not get any exceptions and batch output tells me
"sources" : [ {
"description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 1095,
"processedRowsPerSecond" : 211.0233185584891
} ],
But I do not get any actual data processed. Console sink looks like this:
+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+