3

I have a huge list of GZip files which need to be converted to Parquet. Due to the compressing nature of GZip, this cannot be parallelized for one file.

However, since I have many, is there a relatively easy way to let every node do a part of the files? The files are on HDFS. I assume that I cannot use the RDD infrastructure for the writing of the Parquet files because this is all done on the driver as opposed to on the nodes themselves.

I could parallelize the list of file names, write a function that deals with the Parquets local and saves them back to HDFS. I wouldn't know how to do that. I feel like I'm missing something obvious, thanks!

This was marked as a duplicate question, this is not the case however. I am fully aware of the ability of Spark to read them in as RDDs without having to worry about the compression, my question is more about how to parallelize converting these files to structured Parquet files.

If I knew how to interact with Parquet files without Spark itself I could do something like this:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    write_csv_to_parquet_on_hdfs(file_to)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

That would allow me to parallelize this, however I don't know how to interact with HDFS and Parquet from a local environment. I want to know either:

1) How to do that

Or..

2) How to parallelize this process in a different way using PySpark

Benjamin W.
  • 46,058
  • 19
  • 106
  • 116
Jan van der Vegt
  • 1,471
  • 12
  • 34
  • Your question is fine. This probably just isn't the forum for it, since it doesn't really seem to be a programming question. – John Hascall Feb 15 '16 at 15:59
  • Well, it's about implementing parallelization in PySpark – Jan van der Vegt Feb 15 '16 at 16:00
  • OK, that doesn't really come across in the question. My suggestion: make some kind of an attempt in pyspark, if it doesn't work, bring what you tried here (in a new question). – John Hascall Feb 15 '16 at 16:03
  • Well, I expanded the question again, I'll shoot a message to zero323. I don't know how to approach it which is why I asked for help :) thanks – Jan van der Vegt Feb 15 '16 at 16:12

1 Answers1

0

I would suggest one of the two following approaches (where in practice I have found the first one to give better results in terms of performance).

Write each Zip-File to a separate Parquet-File

Here you can use pyarrow to write a Parquet-File to HDFS:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    pyarrow_table = to_pyarrow_table(gzipped_csv)
    hdfs_client = pyarrow.HdfsClient()
    with hdfs_client.open(file_to, "wb") as f:
        pyarrow.parquet.write_table(pyarrow_table, f)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

There are two ways to obtain pyarrow.Table objects:

  • either obtain it from a pandas DataFrame (in which case you can also use pandas' read_csv() function): pyarrow_table = pyarrow.Table.from_pandas(pandas_df)

  • or manually construct it using pyarrow.Table.from_arrays

For pyarrow to work with HDFS one needs to set several environment variables correctly, see here

Concatenate the rows from all Zip-Files into one Parquet-File

def get_rows_from_gzip(file_from):
    rows = read_gzip_file(file_from)
    return rows

# read the rows of each zip file into a Row object
rows_rdd = filenameRDD.map(lambda x: get_rows_from_gzip(x[0]))

# flatten list of lists
rows_rdd = rows_rdd.flatMap(lambda x: x)

# convert to DataFrame and write to Parquet
df = spark_session.create_DataFrame(rows_rdd)
df.write.parquet(file_to)

If you know the schema of the data in advance, passing in a schema object to create_DataFrame will speed up the creation of the DataFrame.