19

I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge master instance and two m4.10xlarge core instances each with a 100 GB EBS volume). I am aware that gzip is a non-splittable file format, and I've seen it suggested that one should repartition the compressed file because Spark initially gives an RDD with one partition. However, after doing

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

and taking a look at the Spark application UI, I still see only one active executor (the other 14 are dead) with one task, and the job never finishes (or at least I've not waited long enough for it to).

  • What is going on here? Can someone help me understand how Spark is working in this example?
  • Should I be using a different cluster configuration?
  • Unfortunately, I have no control over the mode of compression, but is there an alternative way of dealing with such a file?
Community
  • 1
  • 1
user4601931
  • 4,982
  • 5
  • 30
  • 42

4 Answers4

14

If the file format is not splittable, then there's no way to avoid reading the file in its entirety on one core. In order to parallelize work, you have to know how to assign chunks of work to different computers. In the gzip case, suppose you divide it up into 128M chunks. The nth chunk depends on the n-1-th chunk's position information to know how to decompress, which depends on the n-2-nd chunk, and so on down to the first.

If you want to parallelize, you need to make this file splittable. One way is to unzip it and process it uncompressed, or you can unzip it, split it into several files (one file for each parallel task you want), and gzip each file.

Tim
  • 3,675
  • 12
  • 25
  • 2
    I was under the impression that Spark is decompressing the file first before repartitioning it. Is this not the case? What are the four links that I provided talking about, then? – user4601931 Nov 08 '16 at 18:00
  • Yes, Spark is decompressing the file first in its entirety (80G on one core) before it can shuffle it to increase parallelism. – Tim Nov 08 '16 at 19:06
  • Okay, thank you. Do you think my cluster will even be able to handle this task? If so, if I want to decompress the whole file, repartition it, and then do further processing, do you think setting `spark.dynamicAllocation.enabled=true` will ensure that I get one executor (with as much memory as possible) to do the decompression and then more executors (with less memory but many cores) after to do the processing? – user4601931 Nov 08 '16 at 19:39
  • This is something you don't need to (and shouldn't) do in Spark. Just do something like `zcat file | split -l 1000000` to produce many new files, then recompress each one and go from there. – Tim Nov 08 '16 at 20:01
  • The gzip file is not easily splittable, but it still can be parallel processed with Spark, increasing end-to-end throughput. Check out the brilliant `SplittableGZip` codec which trades CPU hours for wall clock gains: https://github.com/nielsbasjes/splittablegzip/ (and answer below) – Douglas M Oct 11 '21 at 00:41
5

Spark cannot parallelize reading a single gzip file.

The best you can do split it in chunks that are gzipped.

However, Spark is really slow at reading gzip files. You can do this to speed it up:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

Going through Python is twice has fast as reading the native Spark gzip reader.

hcvst
  • 2,665
  • 2
  • 22
  • 25
Mathieu Longtin
  • 15,922
  • 6
  • 30
  • 40
1

The solution I've used is the de-compression codec: SplittableGZipCodec by Niels Basjes. This codec will feed the same file to multiple spark tasks. Each task will 'fast forward' or seek to a specific offset in the gzip file and then begin decompressing from there. It runs multiple tasks on the same gzip file, significantly decreasing the wall clock time, increasing the chances the gunzip is successful at the small cost of increasing the total core hours used. Brilliant. I've tested it on 20-50GB files.

The spark solution is described here in detail: https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )
Douglas M
  • 1,035
  • 8
  • 17
0

I have faced this problem and here is the solution.

Best way to approach this problem is to unzip the .gz file before our Spark batch run. Then use this unzip file, after that we can use Spark parallelism.

Code to unzip the .gz file.

import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)
Amol More
  • 151
  • 1
  • 3