17

Here is my question.
With bunch of .csv files(or other files). Pandas is an easy way to read them and save into Dataframe format. But when the amount of files was huge, I want to read the files with multiprocessing to save some time.

My early attempt

I manually divide the files into different path. Using severally:

os.chdir("./task_1")
files = os.listdir('.')
files.sort()
for file in files:
    filename,extname = os.path.splitext(file)
    if extname == '.csv':
        f = pd.read_csv(file)
        df = (f.VALUE.as_matrix()).reshape(75,90)   

And then combine them.

How to run them with pool to achieve my problem?
Any advice would be appreciated!

FATEH ALIYEV
  • 61
  • 10
Han Zhengzu
  • 3,694
  • 7
  • 44
  • 94
  • 1
    Worth reading if you haven't seen it already: http://stackoverflow.com/questions/14262433/large-data-work-flows-using-pandas or https://www.reddit.com/r/Python/comments/3been9/pandas_speed_up_read_csv_with_multiprocessing/ – Alexander Apr 13 '16 at 02:13

4 Answers4

34

Using Pool:

import os
import pandas as pd 
from multiprocessing import Pool

# wrap your csv importer in a function that can be mapped
def read_csv(filename):
    'converts a filename to a pandas dataframe'
    return pd.read_csv(filename)


def main():

    # get a list of file names
    files = os.listdir('.')
    file_list = [filename for filename in files if filename.split('.')[1]=='csv']

    # set up your pool
    with Pool(processes=8) as pool: # or whatever your hardware can support

        # have your pool map the file names to dataframes
        df_list = pool.map(read_csv, file_list)

        # reduce the list of dataframes to a single dataframe
        combined_df = pd.concat(df_list, ignore_index=True)

if __name__ == '__main__':
    main()
zemekeneng
  • 1,660
  • 2
  • 15
  • 26
7

dask library is designed to address not only but certainly your issue.

Zeugma
  • 31,231
  • 9
  • 69
  • 81
2

I am not getting map/map_async to work, but managed to work with apply_async.

Two possible ways (I have no idea which one is better):

  • A) Concat at the end
  • B) Concat during

I find glob easy to list and fitler files from a directory

from glob import glob
import pandas as pd
from multiprocessing import Pool

folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')

def my_read(filename):
    f = pd.read_csv(filename)
    return (f.VALUE.as_matrix()).reshape(75,90)

#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during

def DF_LIST_append(result):
    #DF_LIST.append(result) # A) end
    global DF # B) during
    DF = pd.concat([DF,result], ignore_index=True) # B) during

pool = Pool(processes=8)

for file in file_list:
    pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)

pool.close()
pool.join()

#DF = pd.concat(DF_LIST, ignore_index=True) # A) end

print(DF.shape)
famaral42
  • 695
  • 7
  • 9
1

If you aren't against using another library, you could use Graphlab's sframe. This creates an object similar to data frames which is very fast to read data if performance is a big issue.

faskiat
  • 689
  • 2
  • 6
  • 21