After referencing to this post, I could read multiple *.txt files residing in a *.tar.gz file. But for now, I need to read HDF5 files in a *.tar.gz file. The sample file could be downloaded here, which is generated from million songs dataset. Could anyone tell me how I should change the following code in order to read HDF5 files into RDD? Thanks!
package a.b.c
import org.apache.spark._
import org.apache.spark.sql.{SQLContext, DataFrame}
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.input.PortableDataStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import scala.util.Try
import java.nio.charset._
object Main {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("lab1").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
import sqlContext._
val inputpath = "path/to/millionsong.tar.gz"
val rawDF = sc.binaryFiles(inputpath, 2)
.flatMapValues(x => extractFiles(x).toOption)
.mapValues(_.map(decode()))
.map(_._2)
.flatMap(x => x)
.flatMap { x => x.split("\n") }
.toDF()
}
def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.map(e => {
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we've read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray})
.toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
}