2

I am attempting to read a very large parquet file (10GB), which i have no control of how is generated (i.e making the file parts smaller for example).

How to best read/write this data? I'm thinking either streaming it from the file or buffer?

my current code looks like this:

from pyspark.sql import SQLContext
from pyspark.sql.types import *

if __name__ == "__main__":
  sc = SparkContext(appName="Parquet2CSV")
  sqlContext = SQLContext(sc)
  readdf = sqlContext.read.parquet('infile.parquet')
  readdf.write.csv('outfile.csv')

this works fine for small files..but for the large one, it fails (essentially blows my heap).

I was able to get a successful return code w/o the write, but with the write, it fails.

What would the best way to do this for large files?

Nate
  • 1,630
  • 2
  • 24
  • 41
  • Are you tied to Python 2.7? – James May 31 '18 at 15:18
  • Do you know how many partitions there are in readdf? You could check using readdf.rdd.partitions.size. It could be that there are too few partitions and this is leading to a lot of memory being chewed in the executors on writes to CSV. If your parquet file is 10GB you can assume that your uncompressed CSV will an order of magnitude larger. You could try repartitioning to a larger number of partitions before writing. Something like readdf.repartition(100).write.csv('outfile.csv') – DemetriKots May 31 '18 at 15:42
  • @James no i am not. (at least not at this time) @DemetriKots readdf.rdd.partitions.size doesnt seem to be valid (partition was not method in rdd) i used `readdf.rdd.getNumPartitions()` instead and got `63` as the answer. total count from readdf.count() returns `66555762`, so not sure if too small – Nate May 31 '18 at 15:52
  • You can try iterating over the rows and manually appending to the csv. https://stackoverflow.com/questions/33030726/how-to-iterate-records-spark-scala?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa – BallpointBen May 31 '18 at 16:36
  • @nate Do you have any log output that you could share? Given the number of rows, it doesn't seem like it should blow up on write unless they are very wide. I am much more comfortable on the Scala side of things, so I am not sure if this is some peculiarity with PySpark specifically. – DemetriKots May 31 '18 at 17:52

1 Answers1

2

Depending on the row group size of the parquet file, you might want to try using fastparquet. It is available on pypi and the conda repo.

import fastparquet

pfile = fastparquet.ParquetFile('infile.parquet')
with open('out.csv', 'w') as fp:
    for i, df in enumerate(pfile.iter_row_groups()):
        write_header = (i==0)
        df.to_csv(fp, index=False, header=write_header)
James
  • 32,991
  • 4
  • 47
  • 70
  • replied earlier...count returned was `66555762` with ~15 columns, and 63 partitions – Nate May 31 '18 at 16:13