3

I am trying to parallelize some calculations with the use of the multiprocessing module.

How can be sure that every process that is spawned by multiprocessing.Pool.map_async is running on a different (previously created) folder?

The problem is that each process calls some third parts library that wrote temp files to disk, and if you run many of those in the same folder, you mess up one with the other.

Additionally, I can't create a new folder for every function call made by map_async, but rather, I would like to create as little as possible folders (ie, one per each process).

The code would be similar to this:

import multiprocessing,os,shutil
processes=16

#starting pool
pool=multiprocessing.Pool(processes)

#The asked dark-magic here?

devshm='/dev/shm/'
#Creating as many folders as necessary
for p in range(16):
    os.mkdir(devshm+str(p)+'/')
    shutil.copy(some_files,p)

def example_function(i):
    print os.getcwd()
    return i*i
result=pool.map_async(example_function,range(1000))

So that at any time, every call of example_function is executed on a different folder.

I know that a solution might be to use subprocess to spawn the different processes, but I would like to stick to multiprocessing (I would need to pickle some objects, write to disk,read, unpickle for every spawned subprocess, rather than passing the object itself through the function call(using functools.partial) .

PS.

This question is somehow similar, but that solution doesn't guarantee that every function call is taking place on a different folder, which indeed is my goal.

Community
  • 1
  • 1
Pierpaolo
  • 1,721
  • 4
  • 20
  • 34

1 Answers1

3

Since you don't specify in your question, i'm assuming you don't need the contents of the directory after your function has finished executing.

The absolute easiest method is to create and destroy the temp directories in your function that uses them. This way the rest of your code doesn't care about environment/directories of the worker processes and Pool fits nicely. I would also use python's built-in functionality for creating temporary directories:

import multiprocessing, os, shutil, tempfile
processes=16

def example_function(i):
    with tempfile.TemporaryDirectory() as path:
        os.chdir(path)
        print(os.getcwd())
        return i*i

if __name__ == '__main__':
    #starting pool
    pool=multiprocessing.Pool(processes)

    result=pool.map(example_function,range(1000))

NOTE: tempfile.TemporaryDirectory was introduced in python 3.2. If you are using an older version of python, you can copy the wrapper class into your code.

If you really need to setup the directories beforehand...

Trying to make this work with Pool is a little hacky. You could pass the name of the directory to use along with the data, but you could only pass an initial amount equal to the number of directories. Then, you would need to use something like imap_unordered to see when a result is done (and it's directory is available for reuse).

A much better approach, in my opinion, is not to use Pool at all, but create individual Process objects and assign each one to a directory. This is generally better if you need to control some part of the Process's environment, where Pool is generally better when your problem is data-driven and doesn't care about the processes or their environment.

There different ways to pass data to/from the Process objects, but the simplest is a queue:

import multiprocessing,os,shutil
processes=16

in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()

def example_function(path, qin, qout):
    os.chdir(path)

    for i in iter(qin.get, 'stop'):
        print(os.getcwd())
        qout.put(i*i)


devshm='/dev/shm/'
# create processes & folders
procs = []
for i in range(processes):
    path = devshm+str(i)+'/'
    os.mkdir(path)
    #shutil.copy(some_files,path)

    procs.append(multiprocessing.Process(target=example_function, args=(path,in_queue, out_queue)))
    procs[-1].start()


# send input
for i in range(1000):
    in_queue.put(i)
# send stop signals
for i in range(processes):
    in_queue.put('stop')

# collect output    
results = []
for i in range(1000):
    results.append(out_queue.get())
bj0
  • 7,893
  • 5
  • 38
  • 49