3

I've 7 csv files with 8 GB each and need to convert to parquet.

Memory usage goes to 100 GB and I had to kill it . I tried with Distributed Dask as well . The memory is limited to 12 GB but no output produced for long time. FYI. I used to traditional pandas with Chunking + Prod consumer --> was able to convert in 30 mins What I'm missing for Dask processing ?

def ProcessChunk(df,...):
    df.to_parquet()     

for factfile in fArrFileList:
   df = dd.read_csv(factfile, blocksize="100MB",
                 dtype=fColTypes, header=None, sep='|',names=fCSVCols)
   result = ProcessChunk(df,output_parquet_file, chunksize, fPQ_Schema, fCSVCols, fColTypes)
  • Hi, i'm not really into python and dask, but the problem of loading large files is quite common. Take a look here maybe https://pythondata.com/dask-large-csv-python/ – djleop Jun 03 '20 at 20:40

2 Answers2

2

Thanks all for suggestions. map_partitions worked.

df = dd.read_csv(filename, blocksize="500MB",
                     dtype=fColTypes, header=None, sep='|',names=fCSVCols)
df.map_partitions(DoWork,output_parquet_file, chunksize, Schema, CSVCols, fColTypes).compute(num_workers=2)

But the same approach for Dask Distributed Local Cluster didn't work well.when the csv size < 100 MB it worked in local cluster mode.

1

I had a similar problem and I found that use dask to split to smallest parquet is very slow and will eventually fail. If you have access to a Linux Terminal you can use parallel or split. For an example of their usage check answers from here

My workflow is supposing your files are called file1.csv,..., file7.csv and are stored in data/raw. I'm assuming you are using the terminal commands from your notebook and this is the reason I'm adding the %%bash magic

  • create folders data/raw_part/part1/,... ,data/raw_part/part7/
%%bash
for year in {1..7}
do
mkdir -p data/raw_parts/part${i}
done
  • for each file run (in case you want to use parallel)
%%bash
cat data/raw/file1.csv | parallel --header : --pipe -N1000000 'cat >data/raw_parts/part1/file_{#}.csv'```

convert files to parquet

  1. first create output folders
%%bash
for year in {1..7}
do
mkdir -p data/processed/part${i}
done
  1. define function to convert csv to parquet
import pandas as pd
import os
from dask import delayed, compute

# this can run in parallel
@delayed
def convert2parquet(fn, fldr_in, fldr_out):
    fn_out = fn.replace(fldr_in, fldr_out)\
               .replace(".csv", ".parquet")

    df = pd.read_csv(fn)
    df.to_parquet(fn_out, index=False)
  1. get all files you want to convert
jobs = []
fldr_in = "data/raw_parts/"
for (dirpath, dirnames, filenames) in os.walk(fldr_in):
    if len(filenames) > 0:
        jobs += [os.path.join(dirpath, fn)
                 for fn in filenames]
  1. process all in parallel
%%time
to_process = [convert2parquet(job, fldr_in, fldr_out) for job in jobs]
out = compute(to_process)
Tony
  • 7,767
  • 2
  • 22
  • 51
rpanai
  • 12,515
  • 2
  • 42
  • 64