I have a directory containing compressed directories, each of which contains a csv file and some others I don't need. I would like to read these csv files into a Spark dataframe.
If I merely had a directory containing csv files, whether gzip compressed or not, this is straightforward. In pyspark,
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.master("local").getOrCreate()
df = spark.read.csv("path/to/csv/files/*csv")
If however instead of csv files I have compressed directories, I can use a solution modified from Read whole text files from a compression in Spark
def extractFiles(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if all([x.isfile(), '._' not in x.name, 'csv' in x.name])]
rdd = (sc.binaryFiles("path/to/compressed/directories/*")
.mapValues(extractFiles)
.mapValues(lambda xs: [x.decode("utf-8") for x in xs]))
df = (rdd.values()
.map(lambda x: ''.join(x))
.map(lambda x: x.strip('\n'))
.flatMap(lambda x: x.split('\n'))
.map(lambda x: x.split(','))
.toDF())
This works (at least on small datasets so far), but it seems overly complicated given that such a simple solution exists for the non-tarfile case. Is there a more elegant (and perhaps scalable) way to do this?