0

I am doing some repetitive tasks on high number of files, such that I'd like to run these tasks in parallel.

Each task is in a function that looks like :

def function(file):
    ...
    return var1, var2, ...

And I manage to run all of this in parallel using :

import concurrent.futures as Cfut     
executor = Cfut.ProcessPoolExecutor(Nworkers)
futures = [executor.submit(function, file) for file in list_files]
Cfut.wait(futures)

What I want to do is:

  1. Finding a way to get var1, var2, var3 back in another variable.
  2. Writing a function that does all the parallelizing process
  3. As each task is very quick on its own, doing groups of workers.

Here is what I wrote for the moment :

def function(files):
    for file in files:
        ...
        print('var1', 'var2', ...)

def multiprocess_loop_grouped(function, param_list, group_size, Nworkers):
    # function : function that is running in parallel
    # param_list : list of items
    # group_size : size of the groups
    # Nworkers : number of group/items running in the same time

    executor = Cfut.ProcessPoolExecutor(Nworkers)   
    futures = [executor.submit(function, param) 
           for param in grouper(param_list, group_size)]
    Cfut.wait(futures)

If I just print var1, var2, etc .., it is working but I need to get these results into an array or something.

Liris
  • 1,399
  • 3
  • 11
  • 29
  • 1
    Look at how to share state between processes: https://docs.python.org/3.6/library/multiprocessing.html#sharing-state-between-processes – Andrej Kesely Jul 18 '18 at 14:04
  • I found also that topic [link](https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce). Should I move from the concurrent.futures to multiprocessing lib in order to use this shared memory ? Or is there any way to implement it with concurrent.futures ? – Liris Jul 18 '18 at 14:15
  • 1
    You can use `multiprocessing` with concurent.futures. That shouldn't be a problem. – Andrej Kesely Jul 18 '18 at 14:27
  • Honestly an async processing structure like gevent would make this far easier and probably offer the same performance. – eatmeimadanish Jul 18 '18 at 14:52
  • I would need a link or a minimal example to see how I can go from this "pool-of-worker" implementation to gevent ! – Liris Jul 18 '18 at 15:05

1 Answers1

0

Using the comment of Andrej Kesely and the lib multiprocessing, I managed to write something that work using a shared dictionary.

def function(files, dic):
    for file in files:
       ...
       dic[i] = var1, var2, ...

def multiprocess_loop_grouped(function, param_list, group_size, Nworkers):
    # function : function that is running in parallel
    # param_list : list of items
    # group_size : size of the groups
    # Nworkers : number of group/items running in the same time

    manager = mlp.Manager()
    dic = manager.dict()

    executor = Cfut.ProcessPoolExecutor(Nworkers)   
    futures = [executor.submit(function, param, dic) 
           for param in grouper(param_list, group_size)]
    Cfut.wait(futures)
    return [dic[i] for i in range(len(dic))]
Liris
  • 1,399
  • 3
  • 11
  • 29