Apache Spark default compression support
I have written all the necessary theory in other answer, that you might want to refer to: https://stackoverflow.com/a/45958182/1549135
Read zip containing multiple files
I have followed the advice given by @Herman and used ZipInputStream
. This gave me this solution, which returns RDD[String]
of the zip content.
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
def readFile(path: String,
minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
if (path.endsWith(".zip")) {
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile {
case null => zis.close(); false
case _ => true
}
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
}
} else {
sc.textFile(path, minPartitions)
}
}
}
simply use it by importing the implicit class and call the readFile method on SparkContext:
import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)