1

I'm posting this for pandas, numpy and spark tags because I'm not really sure the best approach to solve this problem within those three systems.

I have a large parquet file that a downstream process is having trouble opening because it exceeds the system's memory(~63gb in memory if opened at once). I was writing the file as such:

FULL_MAIN.write.mode("overwrite").parquet(PATH+"/FULL_MAIN.parquet")

but the file was too big, so I tried to do this to break the file into smaller chucks:

    split_factor = [.1,.1,.1,.1,.1,.1,.1,.1,.1,.1]
    FULL_MAIN_RDD1,FULL_MAIN_RDD2,FULL_MAIN_RDD3,FULL_MAIN_RDD4,FULL_MAIN_RDD5, FULL_MAIN_RDD6,FULL_MAIN_RDD7,FULL_MAIN_RDD8,FULL_MAIN_RDD9,FULL_MAIN_RDD10  = FULL_MAIN.randomSplit(split_factor)
FULL_MAIN_RDD1.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD1.parquet")
FULL_MAIN_RDD2.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD2.parquet")
...

The problem with this approach is there are other dataframes that I needed to keep the rows aligned to and doing this random split is making the dataframes not aligned.

So my two questions are:

  1. Is there way to split multiple dataframes by relative equal amounts when I don't have any row numbers or numeric counter for each row in my dataset?
  2. Is there a way to read parquet files in batches in pandas or numpy? This would basically solve my problem on the downstream system. I can't figure out how to open the parquet in batches(I've tried to open it in pandas and then split the rows and save each file but when I load the dataframe it crashes my system). I am not sure if it's possible without exceeding memory.
halfer
  • 19,824
  • 17
  • 99
  • 186
Lostsoul
  • 25,013
  • 48
  • 144
  • 239
  • You could look at this link https://stackoverflow.com/questions/59098785/is-it-possible-to-read-parquet-files-in-chunks – user14518362 Jun 25 '21 at 01:13

2 Answers2

2

Parquet file format supports row groups. Install pyarrow and use row_groups when creating parquet file:

df.to_parquet("filename.parquet", row_group_size=10000, engine="pyarrow")

Then you can read group-by-group (or even only specific group):

import pyarrow.parquet as pq

pq_file = pq.ParquetFile("filename.parquet")
n_groups = pq_file.num_row_groups
for grp_idx in range(n_groups):
    df = pq_file.read_row_group(grp_idx, use_pandas_metadata=True).to_pandas()
    process(df)

If you don't have control over creation of the parquet file, you still able to read only part of the file:

pq_file = pq.ParquetFile("filename.parquet")
batch_size = 10000 # records

batches = pq_file.iter_batches(batch_size, use_pandas_metadata=True) # batches will be a generator    
for batch in batches:
    df = batch.to_pandas()
    process(df)
igrinis
  • 12,398
  • 20
  • 45
1

I am not sure if you are having spark . If you want to provide downstream smaller chunks of file , you use repartition to a desired number of chunks and rewrite the parquet file . You can change the repartition number as per your need.

df = spark.read.parquet('filename.parquet')
df.repartition(200).mode('overwrite').save('targetPath')
Rafa
  • 487
  • 7
  • 22