9

I am having a Zipped file containing multiple text files. I want to read each of the file and build a List of RDD containining the content of each files.

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

will just entire files, but how to iterate through each content of zip and then save the same in RDD using Spark.

I am fine with Scala or Python.

Possible solution in Python with using Spark -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
    urls = file_path.split("/")
    urlId = urls[-1].split('_')[0]
Abhishek Choudhary
  • 8,255
  • 19
  • 69
  • 128

5 Answers5

10

Apache Spark default compression support

I have written all the necessary theory in other answer, that you might want to refer to: https://stackoverflow.com/a/45958182/1549135

Read zip containing multiple files

I have followed the advice given by @Herman and used ZipInputStream. This gave me this solution, which returns RDD[String] of the zip content.

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry)
                  .takeWhile {
                      case null => zis.close(); false
                      case _ => true
                  }
                  .flatMap { _ =>
                      val br = new BufferedReader(new InputStreamReader(zis))
                      Stream.continually(br.readLine()).takeWhile(_ != null)
                  }
        }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

simply use it by importing the implicit class and call the readFile method on SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
Community
  • 1
  • 1
Atais
  • 10,857
  • 6
  • 71
  • 111
  • You are not closing the connections. – TriCore Oct 29 '17 at 06:14
  • @Programmer I tried closing it but then this method fails on me. So I left it to Spark. – Atais Oct 29 '17 at 10:58
  • 2
    @Atais Spark didn't close the streams in my case. I attempted to read thousands of files from S3, and failed because connection thread pool exhausted, but it worked once I closed the streams in the code. Anyhow, its always a good idea to do cleanup promptly. – TriCore Oct 29 '17 at 16:59
  • Can you create an answer how did you handle it? Or post it somewhere? – Atais Oct 29 '17 at 17:01
  • I am having other issues. Will post once done. It's something like this https://stackoverflow.com/questions/35746539/close-a-stream – TriCore Oct 29 '17 at 17:28
  • @Programmer please check my update. Closing the outer stream helps? – Atais Oct 30 '17 at 08:29
  • I am facing a problem reading in this way as the data inside the text file is very high in my case. I am getting heap space exception due to arrays.copyOfRange method which copies the array when the size of the array is full and a new object wants to get added to it. So, is there a workaround instead of reading the zip files with using sc.binaryFiles. Please let me know if more info is needed. – sri harsha Jun 13 '18 at 17:50
  • @Atais it would be nice if it gives file name too in the output which lets me convert the rdd to df and then do a filter on file name using spark virutal column input_file_name() – sri hari kali charan Tummala Jun 14 '19 at 19:55
  • I resolved it , .map { x ⇒ val filename1 = x.getName scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString(s"~${filename1}\n")+s"~${filename1}" } #::: { zipInputStream.close; Stream.empty[String] } – sri hari kali charan Tummala Jun 14 '19 at 22:46
  • @sriharikalicharanTummala Hey, thanks for your help with this. I think I am trying to do something similar as you and extract each file contents with it's respective file name. Once you've added that line that gets the file name, how did you proceed? – Leyth G Jan 16 '20 at 15:52
4

If you are reading binary files use sc.binaryFiles. This will return an RDD of tuples containing the file name and a PortableDataStream. You can feed the latter into a ZipInputStream.

Herman
  • 1,459
  • 12
  • 5
2

Here's a working version of @Atais solution (which needs enhancement by closing the streams) :

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

def readFile(path: String,
             minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

  if (path.toLowerCase.contains("zip")) {

    sc.binaryFiles(path, minPartitions)
      .flatMap {
        case (zipFilePath, zipContent) ⇒
          val zipInputStream = new ZipInputStream(zipContent.open())
          Stream.continually(zipInputStream.getNextEntry)
            .takeWhile(_ != null)
            .map { _ ⇒
              scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString("\n")
            } #::: { zipInputStream.close; Stream.empty[String] }
      }
  } else {
    sc.textFile(path, minPartitions)
  }
}
}

Then all you have to do is the following to read a zip file :

sc.readFile(path)
mahmoud mehdi
  • 1,493
  • 1
  • 19
  • 28
  • how to add file name to the output so I can filter on file name imagine one zip file has multiple schema files I can use spark input_file_name virtual column on file name if I can get file name in the rdd @mahmoud mehdi – sri hari kali charan Tummala Jun 14 '19 at 20:37
  • this will give file names too , .map { x ⇒ val filename1 = x.getName scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString(s"~${filename1}\n")+s"~${filename1}" } #::: { zipInputStream.close; Stream.empty[String] } – sri hari kali charan Tummala Jun 14 '19 at 22:47
1

This filters only the first line. can anyone share your insights. I am trying to read a CSV file which is zipped and create JavaRDD for further processing.

JavaPairRDD<String, PortableDataStream> zipData =
                sc.binaryFiles("hdfs://temp.zip");
        JavaRDD<Record> newRDDRecord = zipData.flatMap(
          new FlatMapFunction<Tuple2<String, PortableDataStream>, Record>(){
              public Iterator<Record> call(Tuple2<String,PortableDataStream> content) throws Exception {
                  List<Record> records = new ArrayList<Record>();
                      ZipInputStream zin = new ZipInputStream(content._2.open());
                      ZipEntry zipEntry;
                      while ((zipEntry = zin.getNextEntry()) != null) {
                          count++;
                          if (!zipEntry.isDirectory()) {
                              Record sd;
                              String line;
                              InputStreamReader streamReader = new InputStreamReader(zin);
                              BufferedReader bufferedReader = new BufferedReader(streamReader);
                              line = bufferedReader.readLine();
                              String[] records= new CSVParser().parseLineMulti(line);
                              sd = new Record(TimeBuilder.convertStringToTimestamp(records[0]),
                                        getDefaultValue(records[1]),
                                        getDefaultValue(records[22]));
                              records.add(sd);
                          }
                      }

                return records.iterator();
              }

        });
Anand
  • 11
  • 1
-1

Here is another working solution which gives out file name which can be later split and used to create separate schemas from it.

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.toLowerCase.contains("zip")) {

        sc.binaryFiles(path, minPartitions)
          .flatMap {
            case (zipFilePath, zipContent) ⇒
              val zipInputStream = new ZipInputStream(zipContent.open())
              Stream.continually(zipInputStream.getNextEntry)
                .takeWhile(_ != null)
                .map { x ⇒
                  val filename1 = x.getName
                  scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString(s"~${filename1}\n")+s"~${filename1}"
                } #::: { zipInputStream.close; Stream.empty[String] }
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

full code is here

https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/extraDFExamples/SparkReadZipFiles.scala