1

I have a function called run_3_processes, which spawns 3 processes (duh) using multiprocessing.pool.apply, waits for their results and processes these results and returns a single result.

I have another function called run_3_processes_3_times, which should run run_3_processes 3 times in parallel, wait for all of them to return and then process all their results.

things I tried:

  1. using a process pool for run_3_processes_3_times - turns out this is complicated because of Python Process Pool non-daemonic?
  2. rewriting the entire applicative code to spawn 9 processes with the same pool - this really convoluted my code and breaks encapsulation
  3. using a threadpool.apply for run_3_processes_3_times - for some reason this makes it runs serially, not in parallel - is it because the apply in run_3_processes blocks the GIL?

I'm sure there's a one-liner solution I'm missing... thanks!

ihadanny
  • 4,377
  • 7
  • 45
  • 76
  • I'm confused: are you using both *processes* and *threads* at the same time? Are the threads spawning new processes, or are the processes first forking and then spawning threads? If `run_3_processes` starts 3 new processes using `fork` (or some other call that eventually calls `fork`) then those processes can truly run in parallel with no GIL problems. But I'm not sure what `run_3_processes_3_times` is actually doing. Can you post the code? – Z4-tier Mar 22 '20 at 14:58
  • @Z4-tier - `run_3_processes_3_times` just needs to call `run_3_processes` and wait. That's why I thought lightweight threads are fitting here. `run_3_processes` actually spawns heavy-lifting processes (with `multiprocessing.pool.apply`, but I guess you're right and it calls `fork` down the road). But for some reason it won't give up the GIL, unless I use `apply_async` like below – ihadanny Mar 22 '20 at 15:03

2 Answers2

1

Since you're using a combination of true threads and sub-processes, you will "sort of" run into the GIL, but the way it's structured makes it seem unlikely that it will be a problem. The ThreadPool will be subject to context switches to give concurrency between the threads, but since it's only purpose is to spawn the child processes, it's not doing anything CPU intensive. I'm not sure why it's even necessary to use multiple threads; I would probably just have a single threaded parent process spawn and wait on the child processes directly.

In both functions, it's probably more idiomatic to use the map() method instead of apply_async(), although both will work. Usually that would look a bit like this:

process_count = 3

def pre_process(input_data):
    input_subsets = [[]] * process_count
    for idx, data_point in enumerate(input_data):
        <do any input validation on data_point>
        input_subsets[idx % process_count].append(data_point)
    return input_subsets

def process_data(input_data):
    return_val = []
    for val in input_data:
        <do some processing work>
        return_val.append(<result of processing>) 
    return return_val

data_subsets = pre_process(raw_data)
pool = mp.Pool(process_count)
result_list = pool.map(process_data, data_subsets)
<check result_list>
Z4-tier
  • 7,287
  • 3
  • 26
  • 42
  • thanks! but that's a code rewrite that's breaking encapsulation. `data_subsets` would now have 9 elements, and the post processing `check result_list` would have to discern which element belongs in which of the 3 contexts... – ihadanny Mar 23 '20 at 21:56
  • (1) `data_subsets` will have 3 elements, not 9. (2) Why would `check_result_list` need to know which worker process produced a particular member of the result set? If anything is breaking encapsulation, it's that. If that is the case, you could replace the `map()` call with `map_async()` and use the optional parameter `callback=[callable]` to implement `check_result`. – Z4-tier Mar 24 '20 at 14:36
  • Maybe I am missing something, but I don't understand why multiple threads are needed to start the child worker processes. That can easily be done single threaded, and multi-threading will run into the GIL anyway, so not really much benefit that I can see. Multi-threading might actually make it slower, owing to the cost of the context switching between the threads on a single CPU. – Z4-tier Mar 24 '20 at 14:53
0

ok, found a hacky answer, would love to hear if there's something better:


def run_3_processes_3_times():
        pool = ThreadPool(3)
        candidates = [pool.apply_async(run_3_processes,
                                 args=(c)) for c in range(3)]
        candidates = [x.get() for x in candidates]
        pool.close()

def run_3_processes(c):
        pool = mp.Pool(3)
        solutions = [pool.apply_async(do_it,
                                      args=(i) for i in range(3)]
        solutions = [x.get() for x in solutions]
        pool.close()

ihadanny
  • 4,377
  • 7
  • 45
  • 76