1

I have a tar.gz file that has multiple files. The hierarchy looks as below. My intention is to read the tar.gz file, filter out the contents of b.tsv as it is static metadata where all the other files are actual records.

gzfile.tar.gz
|- a.tsv
|- b.tsv
|- thousand more files.

By pyspark load, I'm able to load the file into a dataframe. I used the command:

spark = SparkSession.\
        builder.\
        appName("Loading Gzip Files").\
        getOrCreate()
input = spark.read.load('/Users/jeevs/git/data/gzfile.tar.gz',\
          format='com.databricks.spark.csv',\
          sep = '\t'

With the intention to filter, I added the filename

from  pyspark.sql.functions import input_file_name
input.withColumn("filename", input_file_name())

Which now generates the data like so:

|_c0 |_c1 |filename |
|b.tsv0000666000076500001440035235677713575350214013124 0ustar  netsaintusers1|Lynx 2.7.1|file:///Users/jeevs/git/data/gzfile.tar.gz|
|2|Lynx 2.7|file:///Users/jeevs/git/data/gzfile.tar.gz|

Of course, the file field is populating with the tar.gz file, making that approach useless. A more irritating problem is, the _c0 is getting populated with filename+garbage+first row values

At this point, I'm wondering if the file read itself is getting weird as it is a tar.gz file. When we did the v1 of this processing, (spark 0.9), we had another step that loaded the data from s3 into an ec2 box, extract and write back into s3. I'm trying to get rid of those steps.

Thanks in advance!

Jeevs
  • 699
  • 2
  • 10
  • 19
  • 1
    Have you gone through this: [Read whole text files from a compression in Spark](https://stackoverflow.com/questions/36604145/read-whole-text-files-from-a-compression-in-spark). The `extractFiles()` method given in the accepted answer gives you a place to filter files. Worth a try. – ernest_k Feb 07 '20 at 05:21

1 Answers1

2

Databricks does not support direct *.tar.gz iteration. In order to process file, they have to be unzipped into temporary location. Databricks support bash than can do the job.

%sh find $source -name *.tar.gz -exec tar -xvzf {} -C $destination \;

Above code will unzip all files with extension *.tar.gz in source to destination location. If the path is passed via dbutils.widgets or static in %scala or %pyspark, the path must be declared as environmental variable. This can be achieved in %pyspark

import os
os.environ[' source '] = '/dbfs/mnt/dl/raw/source/'

Use following methods to load file, in assumption the content in *.csv file:

DF = spark.read.format('csv').options(header='true', inferSchema='true').option("mode","DROPMALFORMED").load('/mnt/dl/raw/source/sample.csv')