0

I have a few hundred thousand csv files I would all like to apply the same function to. Something like the following dummy function:

def process_single_file(fname):
    df = pd.read_csv(fname)
    # Pandas and non-pandas processing
    df.to_csv(f"./output/{fname}")

As looping over all files individually would take too long, my question is what the most efficient way to schedule and parallelize this execution – no processes are dependent on each other. I started off trying to use python's multiprocessing:

import multiprocessing

files = sorted(glob.glob("./input/*.csv"))

processes = []
for fname in files:
    p = multiprocessing.Process(target=process_file, args=(fname,))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

My computer, however, doesn't seem to like this process as it quickly overloads all CPU's and leading to slow-downs and crashes. Is there a more efficient way to reduce the workload of all CPU's and schedule the tasks such as using Dask, some Bash script or changing python? Thanks in advance.

BBQuercus
  • 819
  • 1
  • 11
  • 28

3 Answers3

2

It really depends on where your bottleneck is : are you spending more time reading / writing files, or doing CPU processing stuff ?

This RealPython tutorial really helped me a lot learning about all this stuff, I can only recommend a good read ;)

As explained in the tutorial, if I/O, multithreading is enough (and possibly better than multiprocessing):

def process_all_files(files):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(process_single_file, files)

And if CPU, multiprocessing will let you use all your available cores:

def process_all_files(files):
    with multiprocessing.Pool() as pool:
        pool.map(process_single_file, files)
Big Bro
  • 797
  • 3
  • 12
1

You can try Ray, it is a quite efficient module to parallelize tasks

SteRinaldi
  • 40
  • 8
  • This is just an opinion. For stackoverflow, you should at least show *how* you would use ray, and what makes you think it would overcome the original problem. – mdurant Aug 05 '20 at 12:58
0

Absolutely pool is the way to go. Something along the below lines

`from multiprocessing import Pool

def f(x): return x*x

if name == 'main': pool = Pool(processes=4) `

check the following post

Using multiprocessing.Process with a maximum number of simultaneous processes

A Modgil
  • 260
  • 1
  • 7