14

Is it possible to use Pandas' DataFrame.to_parquet functionality to split writing into multiple files of some approximate desired size?

I have a very large DataFrame (100M x 100), and am using df.to_parquet('data.snappy', engine='pyarrow', compression='snappy') to write to a file, but this results in a file that's about 4GB. I'd instead like this split into many ~100MB files.

Austin
  • 6,921
  • 12
  • 73
  • 138
  • 4
    Do the same thing as [Pandas dataframe to\_csv - split into multiple output files](https://stackoverflow.com/questions/44502306/pandas-dataframe-to-csv-split-into-multiple-output-files), except with `.to_parquet()`. – Trenton McKinney Sep 06 '20 at 22:42
  • 3
    Ended up doing `ddf = dask.dataframe.from_pandas(df, chunksize=5000000); ddf.to_parquet('/path/to/save/')` which saves one file per chunk. – Austin Sep 07 '20 at 03:53
  • 3
    You should write it up as an answer. May be beneficial to others. – Trenton McKinney Sep 07 '20 at 03:55

5 Answers5

18

I ended up using Dask:

import dask.dataframe as da

ddf = da.from_pandas(df, chunksize=5000000)
save_dir = '/path/to/save/'
ddf.to_parquet(save_dir)

This saves to multiple parquet files inside save_dir, where the number of rows of each sub-DataFrame is the chunksize. Depending on your dtypes and number of columns, you can adjust this to get files to the desired size.

Austin
  • 6,921
  • 12
  • 73
  • 138
7

One other option is to use the partition_cols option in pyarrow.parquet.write_to_dataset():

import pyarrow.parquet as pq
import numpy as np

# df is your dataframe
n_partition = 100
df["partition_idx"] = np.random.choice(range(n_partition), size=df.shape[0])
table = pq.Table.from_pandas(df, preserve_index=False)
pq.write_to_dataset(table, root_path="{path to dir}/", partition_cols=["partition_idx"])
Random Certainty
  • 455
  • 6
  • 17
3

Slice the dataframe and save each chunk to a folder, using just pandas api (without dask or pyarrow).

You can pass extra params to the parquet engine if you wish.

def df_to_parquet(df, target_dir, chunk_size=1000000, **parquet_wargs):
"""Writes pandas DataFrame to parquet format with pyarrow.

Args:
    df: DataFrame
    target_dir: local directory where parquet files are written to
    chunk_size: number of rows stored in one chunk of parquet file. Defaults to 1000000.
"""    
for i in range(0, len(df), chunk_size):
    slc = df.iloc[i : i + chunk_size]
    chunk = int(i/chunk_size)
    fname = os.path.join(target_dir, f"part_{chunk:04d}.parquet")
    slc.to_parquet(fname, engine="pyarrow", **parquet_wargs)
Maciej Skorski
  • 2,303
  • 6
  • 14
3

Keep each parquet size small, around 128MB. To do this:

import dask.dataframe as dd

# Get number of partitions required for nominal 128MB partition size
# "+ 1" for non full partition
size128MB = int(df.memory_usage().sum()/1e6/128) + 1
# Read
ddf = dd.from_pandas(df, npartitions=size128MB)
save_dir = '/path/to/save/'
ddf.to_parquet(save_dir)
winderland
  • 362
  • 3
  • 7
0
cunk = 200000
    i = 0
    n = 0
    while i<= len(all_df):
        j = i + cunk
        print((i, j))
        tmpdf = all_df[i:j]
        tmpdf.to_parquet(path=f"./append_data/part.{n}.parquet",engine='pyarrow', compression='snappy')
        i = j
        n = n + 1
L bear
  • 33
  • 4