4

I do have n number of .zip files on s3, which I want to process and extract some data out of them. zip files contains a single json file. In Spark we can read .gz files, but I didn't find any way to read data within .zip files. Can someone please help me out how can I process large zip files over spark using python. I came across some options like newAPIHadoopFile, but didn't get any luck with them, nor found way to implement them in pyspark. Please note the zip files are >1G, some are of 20G as well.

Below is code, which I used:

import zipfile
import io
file_name = "s3 file path for zip file"

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles(file_name)
files_data = zips.map(zip_extract)

But it's failing because of below reason. The instance which I'm using is r42x.large.

Exit code: 52
Stack trace: ExitCodeException exitCode=52: 
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Sandie
  • 869
  • 2
  • 12
  • 22

1 Answers1

0

I did read the contents of zip files in chunks, and processed those chunks using spark. This worked for me, and helped me to read zip files having size more than 10G. Below is example set:

max_data_length=10000
z = zipfile.ZipFile(zip_file)
data = []
counter=1
with z.open(z.infolist()[0]) as f:
    line_counter=0
    for line in f:
        # Append file contents to list
        data.append(line)
        line_counter=line_counter+1
        # Reset counters if record count hit max-data-length threshold
        # Create spark dataframes
        if not line_counter % max_data_length:          
            # Spark processing like:
            df_rdd = spark.sparkContext.parallelize(data)

            # Reset Counters and data-list
            counter=counter+1
            line_counter=0
            data= []
Sandie
  • 869
  • 2
  • 12
  • 22
  • can you elaborate further? I see that the control never goes into the if condition and hence the data is never added to the rdd! If in case the data is huge and it does go in, the counter is reset again, and the end data again is not written to the rdd. Have you figured out the correct solution? – Djeah Jul 18 '21 at 18:59