0

I get OutOfMemoryError when processing tar.gz files greater than 1gb in spark.

To get past this error I have tried splitting the tar.gz into multiple parts using the 'split' command only to find out that each split is not a tar.gz on its own and so cannot be processed as such.

dir=/dbfs/mnt/data/temp
b=524288000
for file in /dbfs/mnt/data/*.tar.gz; 
do 
a=$(stat -c%s "$file");
if [[ "$a" -gt "$b" ]] ; then 
split -b 500M -d --additional-suffix=.tar.gz $file "${file%%.*}_part"
mv $file $dir
fi
done

Error when trying to process the split files

Caused by: java.io.EOFException
    at org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream.read(GzipCompressorInputStream.java:281)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.read(TarArchiveInputStream.java:590)
    at org.apache.commons.io.input.ProxyInputStream.read(ProxyInputStream.java:98)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2001)
    at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1980)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:1957)
    at org.apache.commons.io.IOUtils.copy(IOUtils.java:1907)
    at org.apache.commons.io.IOUtils.toString(IOUtils.java:778)
    at org.apache.commons.io.IOUtils.toString(IOUtils.java:803)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:33)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:31)
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
    at scala.collection.immutable.Stream.foreach(Stream.scala:595)
    at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:316)
    at scala.collection.AbstractTraversable.toMap(Traversable.scala:104)
    at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$.apply(command-2152765781429277:34)
    at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)
    at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)

I have tar.gz files that go upto 4gb in size and each of these can contain upto 7000 json documents whose sizes vary from 1mb to 50mb.

If I want to divide the large tar.gz files into smaller tar.gz files is my only option to decompress and somehow then recompress based on file size or file count? - "is this the way?"

puligun
  • 332
  • 3
  • 12

1 Answers1

3

Normal gzip files are not splittable. GZip Tar archives are harder to deal with. Spark can handle gzipped json files, but not gzipped tar files and not tar files. Spark can handle binary files up to about 2GB each. Spark can handle JSON that's been concatenated together

I would recommend using a Pandas UDF or a .pipe() operator to process each tar gzipped file (one per worker). Each worker would unzip, untar and process each JSON document in a streaming fashion, never filling memory. Hopefully you have enough source files to run this in parallel and see a speed up.

You might want to explore streaming approaches for delivering your compressed JSON files incrementally to ADLS Gen 2 / S3 and using Databricks Auto Loader features to load and process the files as soon as they arrive.

Also answers to this question How to load tar.gz files in streaming datasets? appear promising.

Douglas M
  • 1,035
  • 8
  • 17
  • 1
    I changed the process to decompress the tar.gz archive files using bash commands but for some archive files this process could take days to complete. I started a streaming approach to process new files as they are decompressed to json at the source. I noticed the stream process missed to process few json files even with 'checkpointing' enabled. Maybe the pandas udf you recommended may speed up the whole process. Should I try using the .tgz codec implemented in the link you provided? – puligun Jan 01 '21 at 02:39
  • Hmmm... days for a single 4GB .tar.gz file sounds excessive. I'd do some more testing. AWS S3 CP is very fast because it uses parallel threads to speed copy times. Using I3/DSv2 instances (they have NVMe RAM local drives sized in the TBs) would help the bash script. I faced a similar problem recently, but it didn't have tar files in the mix.My customer had 50GB gzipped CSV files. This codec: https://github.com/nielsbasjes/splittablegzip saved the day by allowing parallel execution. More cores but much less time. An analogous codec that supported Tar streams should help you. – Douglas M Jan 01 '21 at 03:39
  • Actually it takes upto 30 mins for a 4GB tar.gz file and I have about 750 tar files (min:100MB, max:4GB) which total 1TB. It takes days to decompress the total 1 TB. We are using Azure and the cluster is made of DS4v2 workers (min10, max20) and D32sv3 driver (128gb ram, 32 cores). Previously I was using a UDF in scala based on [the link](https://alexwlchan.net/2019/09/unpacking-compressed-archives-in-scala/). The UDF returns a map of key (json file name) and value (actual json content) pairs i.e upto 7000 pairs for each tar.gz file. However It works very well only for tar.gz files under 500MB – puligun Jan 01 '21 at 04:27
  • `pigz` may be faster `tar -xzf` https://serverfault.com/questions/270814/fastest-way-to-extract-tar-gz. It will run faster on a compute optimized instance type, w/ file on local NVMe filesystem. Cluster -> Metrics -> Ganglia will help show where the bottlenecks are (CPU/IO/Network). The UDF looks useful, but sounds like it processes everything into the same spark partition? The original – Douglas M Jan 01 '21 at 21:45
  • The original question of splitting .tar.gz was addressed. You'd have to re-write the large .tar.gz as multiple smaller .tar.gz files. – Douglas M Jan 01 '21 at 22:04
  • Yes the UDF tries to process everything into the same partition and throws the OOM exception. How can I re-write the large .tar.gz as multiple smaller tar.gz files? – puligun Jan 02 '21 at 01:09