0

I have been trying to merge small parquet files each with 10 k rows and for each set the number of small files will be 60-100. So resulting into around 600k rows minimum in the merged parquet file.

I have been trying to use pandas concat.it is working fine with around 10-15 small files merge.

But as the set may be consists of 50-100 files. The process it is getting killed while running python script with memory limit breached

So i am looking for a memory efficient way to merge any number of small parquet in range of 100 file set

Used pandas read parquet to read each individual dataframe and combine them with pd.conact(all dataframe)

Is there a better library other than pandas or if possible in pandas how it can be done efficiently.

Time is not constraint. It can run for some long time as well.

James Z
  • 12,209
  • 10
  • 24
  • 44
Boss Boss
  • 3
  • 1
  • 5
  • Have you tried using dask library instead of pandas? You can use `dd.read_parquet("path/to/files/*.parquet").compute().to_parquet('path/to/merged_file.parquet')` to read, merge and write to a file. Its supposed to be faster and memory efficient but I haven't tested the speed and memory. – Ashyam Feb 16 '23 at 06:35
  • Okay i will check it out – Boss Boss Feb 16 '23 at 07:36
  • It didnt work out, it is even breaching higher limits then normal. I guess it is due to multiprocessing. – Boss Boss Feb 17 '23 at 03:10

2 Answers2

1

You can open files one by one and append them to the parquet file. Best to use pyarrow for this.

import pyarrow.parquet as pq

files = ["table1.parquet", "table2.parquet"]


with pq.ParquetWriter("output.parquet", schema=pq.ParquetFile(files[0]).schema_arrow) as writer:
    for file in files:
        writer.write_table(pq.read_table(file))
0x26res
  • 11,925
  • 11
  • 54
  • 108
  • Thank you,this seems worked but even though the scheme is same it is throwing an exception Value error: Table schema does not match schema used to create file Table: 'column_indexes':[{"na"+ 58501 vs. – Boss Boss Feb 16 '23 at 15:14
  • __index_level_0__: int64 -- schema metadata -- pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 58501 __index_level_0__: int64 -- schema metadata -- pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 58501 vs. Coming like this between file and table – Boss Boss Feb 16 '23 at 15:24
  • It looks like you files don't have have the same schema. It can happen if you generated the parquet files with pandas. It's not doing good job at guessing what the column types are. You can try to cast the data as you read it, but in practice it may be best to make sure the data you generate has got consistent schema. – 0x26res Feb 16 '23 at 15:52
  • It worked with slight modification in the read_table method by using same schema like this writer.write_table(pq.read_table(file, schema=pq.ParquetFile(all_parquet_files[0]).schema_arrow))), It reduced the memory consumption to like 50%. Thank you. But my requirement is still need to reduce it like 80-90 percentage, I got 50 percent upto now. – Boss Boss Feb 16 '23 at 17:14
  • You can try working at record batch level, using a ParquetFile (instead of pq.read_table). But if your files are small it's not going to make a big difference. https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html#pyarrow.parquet.ParquetFile.iter_batches – 0x26res Feb 17 '23 at 09:21
  • Given that Pandas is backed by pyarrow, why do you prefer this method to pd.read, pd.concat, pd.to_parquet? – Coriolanus Aug 25 '23 at 12:01
  • pandas has only been backed by pyarrow since version `2.0.0`. Also this method only loads one file in memory at a time. If you call `pd.concat` you'd have to load every files in memory. They may not fit. – 0x26res Aug 25 '23 at 16:07
  • Thanks for your reply. This answer https://stackoverflow.com/questions/50299815/effectively-merge-big-parquet-files seems to indicate reading all the files in at once is the only way to get the performance improvement from aggregating Parquet files. Is this correct/has the situation changed? – Coriolanus Aug 26 '23 at 00:57
  • I guess it depends what you are trying to optimise for. Memory usage of the concatenating job? Speed of the concatenating job? Optimal structure of the row group in the parquet file? This question is about memory. The question you are pointing out is about the size of the row group in the concatenated parquet file. – 0x26res Aug 27 '23 at 09:41
  • Isn't the main reason to merge Parquet files the performance benefit I mentioned (perhaps there are space considerations -- would we get them from the approach you outlined?)? The issue is that if I try to do the recommended (elsewhere) approach of reading in the files in Spark to achieve the recommended 1 GB on-disk size of the file, the uncompressed in-memory partition size can be ~ 2GB and crash the job (or even 1 GB and be extremely slow). – Coriolanus Aug 27 '23 at 12:25
  • You may want to submit a new question because I don't think your use case relates to this one. – 0x26res Aug 27 '23 at 12:34
0

For large data you should definitely use the PySpark library, split into smaller sizes if possible, and then use Pandas. PySpark is very similar to Pandas.

link

  • Since we are writing a single large Parquet file, wouldn't there potentially be a problem with large partitions (e.g, > 1 GB in memory) for Spark? – Coriolanus Aug 25 '23 at 12:03