0

I have many python functions (each one is a member function of a different class), and wish to run x of them in parallel at every moment (unless there are less than x left).

In other words, I would like to have a queue of all tasks that should be performed and x subprocesses. The main process will pop tasks from the queue till it is empty, and give them to subprocess to perform. The subprocess will inform the main process when they are free and get another task.

I thought of using multiprocess module, but not sure how to know when each subprocess is finished and ready for the next task.

I tried to use the shared queue - fill it class objects with the main process and on each subprocess do something like:

def subprocess(shared_queue):
    while not shared_queue.empty():
          class_obj = shared_queue.pop()
          class_obj.main_func()

However, it turns out I can't fill the queue with my complex classes.

Edit: I think pool wouldn't work as I want to run many different functions, each of them once. The examples I saw with pool run one function many times, with different parameters.

Edit 2: Pool will work for the original problem, by passing functions as parameters, as suggested by comments. But I still want a solution where I manage the queue, because later on I would like to give each task a weight and run tasks such that their sum of weights doesn't cross a threshold. So I still need to know when subprocesses finish their task.

user287263
  • 41
  • 4
  • 1
    You need to look at `pool`, which will keep `x` processes running from a possibly substantially larger list of tasks. Also see [multithreaded pool](https://stackoverflow.com/questions/3033952/threading-pool-similar-to-the-multiprocessing-pool) – quamrana Sep 19 '21 at 15:29
  • But pool runs 1 function, with different parameters. I saw examples with 2 different function, like this one [link](https://stackoverflow.com/questions/6976372/mulitprocess-pools-with-different-functions), but I need many different function, each of them to run once – user287263 Sep 20 '21 at 07:00
  • 2
    Functions can be parameters. Write a function that calls a function parameter. – Mark Tolonen Sep 20 '21 at 07:07
  • Thanks. I still want a solution where I manage the queue, because later on I would like to give each task a weight and run tasks such that their sum of weights doesn't cross a threshold. So I still need to know when subprocesses finish their task. – user287263 Sep 20 '21 at 07:25

1 Answers1

0

Would imap or imap_unordered work for you? You can retrieve finished results by using them as an iterator and could stop the running task as described here: How to kill threads spawned when using the multiprocessing's Pool imap_unordered

The full result would look something like this (untested):

import multiprocessing

def function_caller(func_and_args): #Edited, thanks for the comment
   func_and_args[0](*func_and_args)

funcs_and_args = [[func1, args_for_func1], [func2, args_for_func2], ...]
with multiprocessing.Pool() as pool:
   for result in pool.imap(function_caller, funcs_and_args):
      do_something_with_the_result()
      if weight_reached():
         pool.terminate()

Idea: You first create a list with functions and input arguments. Next you need a wrapper function which runs the functions given the respective args which is what wrapper does. Next you open a pool and run imap as an iterator. Once you have reached the weight you want to reach, you can terminate the pool and thereby end all still running subprocesses. Note that this means that your subfunctions should die gracefully (i.e. they cannot alter any files while running or anything alike since you otherwise wouldn't know how far they came in their work).

Edit1: Now with the new explanation I am not sure whether there is a built-in way to achieve what you are after. What you could do nevertheless is the following:

import multiprocessing
import time

def function_caller(func, args): #Edited, thanks for the comment
   func(*args)

funcs_and_args = [[func1, args_for_func1], [func2, args_for_func2], ...]
results = []
running_processes = []
index = 0
while len(results) != len(funcs_and_args):
   if all_weight_used():
      time.sleep(10)
   else:
      p = multiprocessing.Process(target=function_caller, args=funcs_and_args[i])
      p.start()
      running_processes.append(p)
   for p in running_processes:
      check if alive and retrieve result if not

Tbh I am not 100% sure what the right code for the last part is, but some combination of p.is_alive() and a multiprocessing.Queue() should probably do the job. Maybe somebody else can edit the answer to complete it.

C Hecht
  • 932
  • 5
  • 14
  • Thanks! I think I didn't explain myself correctly. All tasks should run eventually. The weights just determines how many tasks can run at the same time. When a task is finished, I want to subtract it's weight from the sum and make room for a new task. At any given moment the sum of weights of the running tasks shouldn't cross a weight threshold – user287263 Sep 20 '21 at 08:08
  • I believe you when you say your code is untested because you cannot use a lambda function as the *func* argument for `Pool.imap`. – Booboo Sep 20 '21 at 15:23
  • @user287263 `multiprocessing.pool.Pool.apply_async` method has a *callback* argument specifying a function to be called with a result when it becomes available. That is one way of knowing when a task is finished. – Booboo Sep 20 '21 at 15:27
  • @Booboo: Thanks for the hint. I edited the answer to include a normal function. – C Hecht Sep 21 '21 at 07:19
  • @user287263: Thanks for clarifying. I edited my answer to address your question. I am not 100% sure how the part where you retrieve results works, but you will be able to find it with a bit of try-and-error. – C Hecht Sep 21 '21 at 07:20