1

I have multiple CSV files in my folder myfolder.tar.gz. Which I created in this way: first put all my files in a folder name myfolder then prepare a tar folder of it. Then prepare .gz of that tar folder.

Let us say we have 5 files.

abc_1.csv
abc_2.csv
abc_3.csv
def_1.csv
def_2.csv

I want to filter read files in a specific filename pattern using Pyspark data frame. Like we want to read all abc files together.

This should not give us the results from def and vice versa. Currently, I am able to read all the CSV files together by just using spark.read.csv() function. Also, I am able to filter file when I keep the files in a simple folder using pathGlobalFilter parameter like this:

df = spark.read.csv("mypath",pathGlobalFilter="def_[1-9].csv")

But when I am able to do the same in tar.gz, like:

df = spark.read.csv("myfolder.tar.gz", pathGlobalFilter="def_[1-9].csv")

I am getting an error:

Unable to infer Schema for CSV. How to read from .tar.gz file.

blackbishop
  • 30,945
  • 11
  • 55
  • 76
supernova
  • 93
  • 9

1 Answers1

2

Based on this post, you can read the .tar.gz file as binaryFile then using python tarfile you can extract the archive members and filter on file names using the regex def_[1-9]. The result is an rdd that you can convert into a data frame :

import re
import tarfile
from io import BytesIO

# extract only the files with which math regex 'def_[1-9].csv'
def extract_files(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if re.match(r"def_[1-9].csv", x.name)]

# read binary file and convert to df
rdd = sc.binaryFiles("/path/myfolder.tar.gz") \
        .mapValues(extract_files) \
        .flatMap(lambda row: [x.decode("utf-8").split("\n") for x in row[1]])\
        .flatMap(lambda row: [e.split(",") for e in row])

df = rdd.toDF(*csv_cols)
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • Thank you. But when I am trying to run the code and trying to print anything inside extract_files function, it is not printing anything. Also rdd.count() is showing 0. – supernova Feb 05 '21 at 05:00
  • 1
    @supernova I tested it with CSV files having the names provided in your question and I was able to get the desired result. Make sure the files in the `tar.gz` archive respect naming regex `def_[1-9].csv`. To print the names of the file, you can modifiy the `extract_files` to return `return [x.name for x in tar]` and collect the rdd like this `print(sc.binaryFiles("/path/myfolder.tar.gz") .mapValues(extract_files).collect())` – blackbishop Feb 05 '21 at 11:21
  • 1
    Thanks a lot. Yes, I was having problem with regex expression. Now it is working. Also instead of mapValues(extract_files) , in Pyspark, I need to call function like this mapValues(lambda x : extract_files(x)). Now it is working fine. Thanks again. – supernova Feb 05 '21 at 11:57