15

I have zip files that I would like to open 'through' Spark. I can open .gzip file no problem because of Hadoops native Codec support, but am unable to do so with .zip files.

Is there an easy way to read a zip file in your Spark code? I've also searched for zip codec implementations to add to the CompressionCodecFactory, but am unsuccessful so far.

JeffLL
  • 1,875
  • 3
  • 19
  • 30

7 Answers7

23

There was no solution with python code and I recently had to read zips in pyspark. And, while searching how to do that I came across this question. So, hopefully this'll help others.

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()

In the above code I returned a dictionary with filename in the zip as a key and the text data in each file as the value. you can change it however you want to suit your purposes.

TrigonaMinima
  • 1,828
  • 1
  • 23
  • 35
  • 1
    This worked fine for my not-so-large zip files. Other fun part is that once you have the binary of the unpacked zip file, there's no easy way to get it to hdfs or s3. What I had to do was to write it to a local file using python and then take it from there and move it to s3. – pcv Apr 03 '19 at 14:00
  • Is there a way I can apply same logic for `bz2` files. I am trying to not able to convert rdd to `BytesIO` – Gaurang Shah Aug 23 '21 at 14:11
  • @GaurangShah, there is. You can use `bz2.decompress` to decompress bz2 in memory. For example `decompressed_x = bz2.decompress(x[1])`. – LazyGoose Jan 29 '22 at 10:47
  • does this work for `.gz` files too? Also Im not able to display it as a dataframe even after converting to a dataframe , as i have a text file – Scope Feb 14 '22 at 20:00
6

@user3591785 pointed me in the correct direction, so I marked his answer as correct.

For a bit more detail, I was able to search for ZipFileInputFormat Hadoop, and came across this link: http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

Taking the ZipFileInputFormat and its helper ZipfileRecordReader class, I was able to get Spark to perfectly open and read the zip file.

    rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

The result was a map with one element. The file name as key, and the content as the value, so I needed to transform this into a JavaPairRdd. I'm sure you could probably replace Text with BytesWritable if you want, and replace the ArrayList with something else, but my goal was to first get something running.

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});
JeffLL
  • 1,875
  • 3
  • 19
  • 30
5

Please try the code below:

using API sparkContext.newAPIHadoopRDD(
    hadoopConf,
    InputFormat.class,
    ImmutableBytesWritable.class, Result.class)
ManoDestra
  • 6,325
  • 6
  • 26
  • 50
Tinku
  • 751
  • 5
  • 19
4

I've had a similar issue and I've solved with the following code

sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>

        val zipInputStream = new ZipInputStream(zipContent.open())

        Stream.continually(zipInputStream.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { zipEntry => ??? }
    }
  • 1
    If there is multiple file in a zip file, I am able to read only first file. Can you please provide code snippet for unzipping multiple file. I also have referred https://github.com/eric-maynard/spark-ETL/blob/master/scala/Example%201.scala link – Vaijnath Polsane Apr 26 '17 at 14:25
  • 1
    @VaijnathPolsane please check my answer – Atais Aug 30 '17 at 11:52
4

This answer only collects the previous knowledge and I share my experience.

ZipFileInputFormat

I tried following @Tinku and @JeffLL answers, and use imported ZipFileInputFormat together with sc.newAPIHadoopFile API. But this did not work for me. And I do not know how would I put com-cotdp-hadoop lib on my production cluster. I am not responsible for the setup.

ZipInputStream

@Tiago Palma gave a good advice, but he did not finish his answer and I struggled quite some time to actually get the decompressed output.

By the time I was able to do so, I had to prepare all the theoretical aspects, which you can find in my answer: https://stackoverflow.com/a/45958182/1549135

But the missing part of the mentioned answer is reading the ZipEntry:

import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;   

sc.binaryFiles(path, minPartitions)
      .flatMap { case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
 
Community
  • 1
  • 1
Atais
  • 10,857
  • 6
  • 71
  • 111
2
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class) 

File name should be pass using conf

conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)

Please Find PROPERTY_NAME from your input formatter for set path

Eray Balkanli
  • 7,752
  • 11
  • 48
  • 82
Tinku
  • 751
  • 5
  • 19
  • Using above code, I was able execute it successfully till 56MB, but it is getting fail for the file with size 338MB, I am end up with exception java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at hydrograph.engine.spark.zipread.ZipFileRecordReader.nextKeyValue(ZipFileRecordReader.java:105)....... – Vaijnath Polsane Apr 26 '17 at 14:18
0

Try:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.text("yourGzFile.gz")
Memphis Meng
  • 1,267
  • 2
  • 13
  • 34
  • 2
    While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value. – mufazmi May 21 '21 at 18:25