2

I have a directory containing compressed directories, each of which contains a csv file and some others I don't need. I would like to read these csv files into a Spark dataframe.

If I merely had a directory containing csv files, whether gzip compressed or not, this is straightforward. In pyspark,

from pyspark.sql import SparkSession    
sparkSession = SparkSession.builder.master("local").getOrCreate()
df = spark.read.csv("path/to/csv/files/*csv")

If however instead of csv files I have compressed directories, I can use a solution modified from Read whole text files from a compression in Spark

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if all([x.isfile(), '._' not in x.name, 'csv' in x.name])]

rdd = (sc.binaryFiles("path/to/compressed/directories/*")
          .mapValues(extractFiles)
          .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))
df = (rdd.values()
        .map(lambda x: ''.join(x))
        .map(lambda x: x.strip('\n'))
        .flatMap(lambda x: x.split('\n'))
        .map(lambda x: x.split(','))
        .toDF())

This works (at least on small datasets so far), but it seems overly complicated given that such a simple solution exists for the non-tarfile case. Is there a more elegant (and perhaps scalable) way to do this?

Community
  • 1
  • 1
evilpilotfish
  • 153
  • 2
  • 10

0 Answers0