1

I have to do CPU-bound tasks, every task is assigened to a process with multiprocessing.Pool

with multiprocessing.Pool(3) as p:
     results = list(p.map(task, [args1, args2, args3, aegs4, ..., argsn]))

In every task there is a for loop, as the last one, that can be parallelized with multiprocessing.pool, but when i do it I get:

AssertionError: daemonic processes are not allowed to have children

I know one possible solution is: Python Process Pool non-daemonic? But my question is: should I make a pool process non-deamon or it is unsafe?

Now I do this:


# subtask
def update(args): 
   ...
   return updated_a


# task
def task(args):
   ...
   for i in range(200):
      # evaluations
      ...
      with multiprocessing.get_context('spawn').Pool(len(self.arraies)) as p:
          self.arraies = list(p.map(update, [[..., a] for a in self.arraies]))
   ...
   return result


...
ss = np.random.SeedSequence()
tasks_seeds = ss.spawn(N_ITERATIONS + 1)
streams = [np.random.default_rng(s) for s in tasks_seeds]
results = []
with multiprocessing.get_context('spawn').Pool(3) as p:
         results = list(p.map(task, [[...,streams[i]] for i in range(N_ITERATIONS)]))
...
  
ZC13
  • 11
  • 3
  • Is there a reason why you have to run a Pool inside of a Pool instead of running the final tasks in the initial pool as well? The idea is usually for "the Pool" to have all the processing resources available to you, spawning another means resources are congested and performance gets worse. – MisterMiyagi Dec 01 '22 at 12:28
  • i have to run the same task n(n is an user input) times (every task has a random initialization) and at the end i compare all the differentn results. Now, every task does different subtasks and k times a specific subtask. Does this make sense? – ZC13 Dec 01 '22 at 12:58
  • It's as if I lounch n times the same program and than parallelize that program. There are any different solutions? – ZC13 Dec 01 '22 at 13:00
  • "Does this make sense?" That's a description of your domain task, not the programming task. Why do you need to run the "k times a specific subtask" as Pool'd child process of the original Pool children? – MisterMiyagi Dec 01 '22 at 13:17
  • Sorry, I didn't get the question. What I called subtask it's a funtion that take in input an object O that has k bi-dimensional arraies and subtask has to update the arraies with a specifc matematical update function. The update funcion is the same, but all arraies are different. The "task" program create O and initialize it randomly, than it calls (with 2 nested for loop) 200 times the update funcion on each array. At the end of the 200 iterations all arraies converge, task returns and I compere all converged objects O in the main. I wanted to create child process of task to speed up – ZC13 Dec 01 '22 at 13:55
  • How does any of that relate to having to use multiple, nested multiprocessing Pools? – MisterMiyagi Dec 01 '22 at 13:57
  • What would you like to do is be able to pass to a multiprocessing pool task the pool itself so that the task could submit more subtasks to the original pool. But, of course, you cannot pickle a pool. But what if you create a multithreading pool and a multiprocessing pool? You then submit your main tasks to the multithreading pool passing explicitly or implicitly (via a global) the multiprocessing pool. All these multithreaded tasks do is submit tasks to the multiprocessing pool and wait for the returned results (little CPU processing and mostly waiting), which are returned to the main process. – Booboo Dec 01 '22 at 16:17
  • (...continued) Any CPU-intensive processing required by the multithreading tasks (such as initialization of objects) can be performed by submitting a task to the multiprocessing queue (although that might not improve performance unless this processing is sufficiently CPU-intensive). – Booboo Dec 01 '22 at 16:19
  • I edited my last comment and added my code without stuff in excess – ZC13 Dec 01 '22 at 16:19
  • So what I have to do is define in Pool(NP) NP as the number of process I want "my program" to use (I suppose that i have to set NP>N_ITERATIONS, am I right?) and than create a ThreadPool that will pass the subtasks to the pool that will manage them. Normally python interpreter allows to just one thread to be in execution, but this threads won't be "standard" python thread, did I get it right? – ZC13 Dec 01 '22 at 16:39

1 Answers1

0

This is a bit too long to answer as a comment, and so ...

If what these tasks are doing is all or mostly all CPU-processing with very little waiting, then you should not be creating a processing pool greater than the number of CPU cores you have. See below for the general idea. Instead of using a multithreading pool, you could simply have a single multiprocessing pool and restructure code to do the iterating in the main process:

from multiprocessing.pool import ThreadPool, Pool
import multiprocessing
from functools import partial

# subtask
def update(args): 
   ...
   return updated_a


# task
# This runs in a multithreading pool, but it
# is mostly waiting for the multiprocessing pool to generate
# results:
def task(process_pool, args):
    ...
    for i in range(200):
        # evaluations
        ...
        self.arraies = list(process_pool.map(update, [[..., a] for a in self.arraies]))
    ...
   return result


# required for spawned processes:
if __name__ == '__main__':
    ...
    ss = np.random.SeedSequence()
    tasks_seeds = ss.spawn(N_ITERATIONS + 1)
    streams = [np.random.default_rng(s) for s in tasks_seeds]
    # Use number of cpu cores:
    with multiprocessing.get_context('spawn').Pool() as process_pool:
        task_args = [[...,streams[i]] for i in range(N_ITERATIONS)]
        # Limit thread pool size to a maximum of 200 (rather arbitrary):
        with ThreadPool(min(200, len(task_args))) as thread_pool:
            # pass the pool as the first argument:
            worker = partial(task, process_pool)
            results = p.map(worker, task_args)
Booboo
  • 38,656
  • 3
  • 37
  • 60